由于运维工作需要,经常需要将一些数据从数据库中导出,发送给运营和需求部门,天天去手动查询,又有点太费时间了,于是研究学习了Python的基本功能,通过Python脚本和Linux 的crontab命令实现了每天自动化的数据查询和邮件发送,今天小编就来说说关于python邮件发送教程?下面更多详细答案一起来看看吧!
python邮件发送教程
背景由于运维工作需要,经常需要将一些数据从数据库中导出,发送给运营和需求部门,天天去手动查询,又有点太费时间了,于是研究学习了Python的基本功能,通过Python脚本和Linux 的crontab命令实现了每天自动化的数据查询和邮件发送。
代码实现定义了以下几个代码模块实现了配置文件读取、日志记录、数据库连接访问查询、导出到xlsx和带附件的邮件发送功能。1、demo.py 示例文件2、config.yml 配置文件3、common_log.py 实现日志记录4、common_db.py 实现数据库连接和访问5、common_xlsx.py 实现数据表格的处理6、common_email.py 实现带附件的邮件发送
1、demo.py 示例# coding: utf-8import common_db as mydbimport common_xlsx as my_xlsximport common_log as mylogimport common_email as my_emailif __name__ == "__main__": phone = '13********7' sql = """ select orderno,orderAmount,actualAmount,phone order where phone= '{0}' """.format(phone) # 查询订单 result = mydb.select_by_parameters(sql) filename = "测试.xlsx" sheet_name = '订单查询' # 邮件接收人 receivers = "zhangsan@aliyun.com" receivers_cc = "lisi@aliyun.com" # 邮件抄送人 if len(result) > 0: # 创建工作表 my_xlsx.create_xlsx(filename) mylog.logger.info("创建工作表成功" filename) # 将查询结果放入工作表 my_xlsx.create_sheet_in_xlsx(filename, sheet_name, result,0) mylog.logger.info("创建工作簿成功" sheet_name) # 对工作簿求和 my_xlsx.sum_col_for_sheet(filename, sheet_name) mylog.logger.info("求和汇总成功") # 删除空工作表 my_xlsx.delete_sheet_from_xlsx(filename,'Sheet') # 发送邮件 my_email.to_send_email("测试查询结果", receivers, receivers_cc, filename, "订单查询结果.xlsx") mylog.logger.info("发送邮件成功")
mysql: host: 192.168.x.x port: 3306 username: xxxx password: xxxx database: xxxxlog: log_path: D:\log log_size: 8 log_num: 3email: smtp: smtp.126.com # 发送方邮件地址 from: xxxx@aliyun.com # 发送方授权码 password: fdsafafdsafsa
import logging.handlersimport loggingimport yamlimport osimport sys# 提供日志功能class logger: # 先读取XML文件中的配置数据 # 由于config.xml放置在与当前文件相同的目录下,因此通过 __file__ 来获取XML文件的目录,然后再拼接成绝对路径 # 这里利用了lxml库来解析XML # root = etree.parse(os.path.join(os.path.dirname(__file__), 'config.xml')).getroot() # 先读取yml中的配置数据 # 由于 # 读取日志文件保存路径 with open('config.yml', 'r') as f: result = yaml.load(f, Loader=yaml.FullLoader) config_log = result["log"] logpath = config_log['log_path'] # root.find('logpath').text # 读取日志文件容量,转换为字节 logsize = 1024*1024*int(config_log['log_size']) # 读取日志文件保存个数 lognum = int(config_log['log_num']) # 日志文件名:由用例脚本的名称,结合日志保存路径,得到日志文件的绝对路径 logname = os.path.join(logpath, sys.argv[0].split('/')[-1].split('.')[0]) ".log" # 初始化logger log = logging.getLogger() # 日志格式,可以根据需要设置 fmt = logging.Formatter('[%(asctime)s][%(filename)s][line:%(lineno)d][%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') # 日志输出到文件,这里用到了上面获取的日志名称,大小,保存个数 handle1 = logging.handlers.RotatingFileHandler(logname, maxBytes=logsize, backupCount=lognum) handle1.setFormatter(fmt) # 同时输出到屏幕,便于实施观察 handle2 = logging.StreamHandler(stream=sys.stdout) handle2.setFormatter(fmt) log.addHandler(handle1) log.addHandler(handle2) # 设置日志基本,这里设置为INFO,表示只有INFO级别及以上的会打印 log.setLevel(logging.INFO) # 日志接口,用户只需调用这里的接口即可,这里只定位了INFO, WARNING, ERROR三个级别的日志,可根据需要定义更多接口 @classmethod def info(cls, msg): cls.log.info(msg) return @classmethod def warning(cls, msg): cls.log.warning(msg) return @classmethod def error(cls, msg): cls.log.error(msg) return
import timeimport pymysqlfrom common_log import *# 连接数据库def get_connection(): _conn_status = True _max_retries_count = 10 # 设置最大重试次数 _conn_retries_count = 0 # 初始重试次数 _conn_timeout = 3 # 连接超时时间为3秒 with open('config.yml', 'r') as f: result = yaml.load(f, Loader=yaml.FullLoader) config_mysql = result["mysql"] while _conn_status and _conn_retries_count <= _max_retries_count: try: connect = pymysql.connect(host=config_mysql['host'], user=config_mysql['username'], password=config_mysql['password'], database=config_mysql['database'], port=config_mysql['port']) _conn_status = False # 如果conn成功则_status为设置为False则退出循环,返回db连接对象 logger.info("连接数据库成功") return connect except Exception as e: _conn_retries_count = 1 logger.info("第%s次连接数据库失败"%(_conn_retries_count)) logger.error(e) time.sleep(3) continue# 查询函数def select_by_parameters(sql, params=None): try: connect = get_connection() cursor = connect.cursor(pymysql.cursors.DictCursor) cursor.execute(sql, params) result = cursor.fetchall() return result except Exception as e: logger.error("执行查询报错") logger.info(sql) logger.error(e) finally: try: cursor.close() except Exception as e: logger.error("关闭游标对象报错") logger.error(e) try: connect.close() except Exception as e: print(e) print("数据库链接关闭异常")
import openpyxlimport osfrom openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font, colorsfrom openpyxl.utils import get_column_letter# 定义边框thin_border = Border(left=Side(style='thin', color='FFFFFF'), right=Side(style='thin', color='FFFFFF'), top=Side(style='thin', color='FFFFFF'), bottom=Side(style='thin', color='FFFFFF'))# 居中对齐alignment_center = Alignment(horizontal='center', vertical='center')# 右对齐alignment_right = Alignment(horizontal='center', vertical='center')# 双行填充fill_double = PatternFill(fgColor='FFDCE6F1', fill_type='solid')# 单行填充fill_single = PatternFill(fgColor='FFB8CCE4', fill_type='solid')# 表头填充fill_head = PatternFill(fgColor='FF366092', fill_type='solid')font_head = Font(bold=True, color='FFFFFFFF')# 1、创建xlsx脚本——在指定的filepath,创建指定的filename的xlsx文件def create_xlsx(filename): wb = openpyxl.Workbook() wb.save(filename)# 2、增加工作簿脚本——读取指定路径下的xlsx文件,在工作表第index位置增加一个工作簿,并将mysql查询结果result写入到该工作簿def create_sheet_in_xlsx(filename, sheet_name, result, index): # 加载文件 wb = openpyxl.load_workbook(filename) # 在指定位置创建工作表 wb.create_sheet(sheet_name, index) # 获取新建的工作表 ws = wb[sheet_name] j = 1 if len(result) > 0: # 写表头 for key, value in (result[0].items()): ws.cell(1, j, format(key)).border = thin_border # 定义对齐方式 ws.cell(1, j).alignment = alignment_center # 字体 颜色为白色 ws.cell(1, j).font = font_head # 填充 ws.cell(1, j).fill = fill_head # 边框 j = j 1 ws.row_dimensions[1].height = 30 # 写数据 # 根据结果集行数量进行循环 for i in range(len(result)): # 循环当前行,定义j变量为列使用 j = 1 for key, value in (result[i].items()): if i % 2 == 0:ws.cell(i 2, j).fill = fill_double else:ws.cell(i 2, j).fill = fill_single ws.cell(i 2, j, value) ws.cell(i 2, j).border = thin_border ws.cell(i 2, j).alignment = alignment_center ws.row_dimensions[i 2].height = 20 if '时间' in format(key):ws.cell(i 2, j).alignment = alignment_centerws.cell(i 2, j).number_format = 'yyyy-mm-dd hh:mm:ss'j = j 1continue # 如果字段名称是数量的话,不保留小数 if '数量' in format(key):ws.cell(i 2, j).number_format = '0'j = j 1continue # 如果是字段名称包含金额的话,则右对齐 if '金额' in format(key):ws.cell(i 2, j).alignment = alignment_rightws.cell(i 2, j).number_format = '#,##0.00'j = j 1continue j = j 1 # 保存工作表 wb.save(filename)# 3、删除工作簿def delete_sheet_from_xlsx(filename,sheet_name): wb = openpyxl.load_workbook(filename) wb.remove_sheet(wb[sheet_name]) wb.save(filename)# 4、往工作簿中追加行 将mysql的查询结果追加到工作表sheet_name末尾def add_result_to_sheet(filename,new_sheet_name,result): wb = openpyxl.load_workbook(filename) ws = wb[new_sheet_name] # 获取最大行 mr = ws.max_row if len(result) > 0: for i in range(len(result)): # 循环当前行,定义j变量为列使用 j = 1 for key, value in (result[i].items()): if i % 2 == 0:ws.cell(i mr, j).fill = fill_double else:ws.cell(i mr, j).fill = fill_single ws.cell(i mr, j, value) ws.cell(i mr, j).border = thin_border ws.cell(i mr, j).alignment = alignment_center ws.row_dimensions[i 2].height = 20 if '时间' in format(key):ws.cell(i mr, j).number_format = 'yyyy-mm-dd hh:mm:ss'j = j 1continue # 如果字段名称是数量的话,不保留小数 if '数量' in format(key):ws.cell(i mr, j).number_format = '0'j = j 1continue # 如果是字段名称包含金额的话,则右对齐 if '金额' in format(key):ws.cell(i mr, j).alignment = alignment_rightws.cell(i mr, j).number_format = '#,##0.00'j = j 1continue j = j 1 wb.save(filename)# 5、删除工作簿中最大行def delete_max_row_from_sheet(filename,sheet_name): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] ws.delete_rows(ws.max_row) wb.save(filename)# 6、删除工作簿中指定列def delete_col_from_sheet(filename,sheet_name,colno): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] ws.delete_cols(colno) wb.save(filename)# 7、对表格中金额和数量字段进行求和def sum_col_for_sheet(filename,sheet_name): wb = openpyxl.load_workbook(filename) ws = wb[sheet_name] for col in list(ws.columns): l = [c.value for c in col] if '金额' in l[0] or '数量' in l[0]: # 本列行的数量 row_size = len(col) 1 # 本列的列号是 col_no = col[1].column col_code = get_column_letter(col[1].column) ws.cell(row_size, col_no, "=sum(" str(col_code) str(2) ":" str(col_code) str( row_size - 1) ")").border = thin_border ws.cell(row_size, col_no).font = font_head # 填充 ws.cell(row_size, col_no).fill = fill_head # 测试添加样式 if '金额' in l[0]: ws.cell(row_size, col_no).number_format = '#,##0.00' ws.cell(row_size, col_no).alignment = alignment_right if '数量' in l[0]: ws.cell(row_size, col_no).number_format = '0' ws.cell(row_size, col_no).alignment = alignment_center ws.row_dimensions[col_no].height = 20 wb.save(filename)
from email.mime.text import MIMETextfrom email.header import Headerfrom email.mime.multipart import MIMEMultipartfrom smtplib import SMTP_SSLfrom common_log import *import yaml## file_Name是路径名称加文件名和扩展名;# new_file_name是在邮件附件中显示的名称# receivers是收件人列表,中间逗号隔开# receivers_cc是抄送人列表# mail_subject是邮件主题def to_send_email(mail_subject,receivers,receivers_cc,file_name,new_file_name): with open('config.yml', 'r') as f: result = yaml.load(f, Loader=yaml.FullLoader) config_email = result["email"] password = config_email['password'] msg = MIMEMultipart('related') msgAlternative = MIMEMultipart('alternative') msgAlternative.attach(MIMEText("<h1>见附件</h1> <br />", "html", "utf-8")) msg.attach(msgAlternative) # file_name 是指文件路径加名称和扩展名 file1 = MIMEText( open(file_name, 'rb').read(), 'base64', 'utf-8' ) file1["Content-Type"] = 'application/octet-stream' # new_file_name是指邮件附件中显示的名称 file1.add_header('Content-Disposition', 'attachment',filename=new_file_name) msg.attach(file1) msg['Subject'] = Header(mail_subject, 'utf-8').encode() msg['From'] = config_email['from'] msg['To'] = receivers msg['Cc'] = receivers_cc try: smtp = SMTP_SSL(config_email['smtp']) smtp.login(msg['From'], password) smtp.sendmail(msg['From'], msg['To'].split(',') msg['Cc'].split(','), msg.as_string()) logger.info("发送邮件成功,邮件接收人是:%s,邮件抄送人是:%s"%(receivers, receivers_cc)) except Exception as e: logger.error("发送邮件出现错误,邮件接收人是:%s,邮件抄送人是:%s" % (receivers, receivers_cc)) logger.error(e) finally: try: smtp.quit() except Exception as e: logger.error(e)