Python的魅力:从简记博客到Linux运维工具

1、引言

由于信创要求及其他客观条件因素公司产品迭代,服务器开始转向Linux,数据库和中间件也开始选用国产方案。在公司技术转型过程中,Python又进入了我的生活,最早期学习了一段时间,了解了一些基本知识,介于当时没有良好的应用环境逐渐荒废。

Python 作为一种高级编程语言,以其简洁的语法、丰富的第三方库和强大的社区支持,吸引了越来越多的开发者。本文将通过两个实际案例——简记博客的改版和Linux运维工具的开发,展示Python在不同场景下的应用及其带来的便利。

2、简记博客的改版

2.1、背景

简记博客是一款个人博客系统,最初版本使用ASP和ACCESS构建。随着技术的发展和个人需求的变化,决定将其改版为基于Python的版本。Python 的简洁语法和强大的Web框架使得这一改版变得轻松而高效。

2.2、技术栈

  • Flask:轻量级的Web框架,适合快速开发小型应用。
  • Mysql:后台数据库。
  • SQLAlchemy:ORM(对象关系映射)工具,简化数据库操作。
  • Jinja2:模板引擎,用于动态生成HTML页面。
  • Layui:前端框架,提供响应式设计和美观的界面。

2.3、改版步骤

  1. 环境准备

安装Python和相关依赖:

pip install Flask SQLAlchemy Jinja2
  1. 项目结构

创建项目目录结构:

jianji/
├── app.py
├── db.py
├── templates/
│   └── index.html/.....
├── static/
│   └── css/
│       └── main.css/....
└── models.py
  1. 数据库模型

使用SQLAlchemy定义数据库模型:

 from flask_sqlalchemy import SQLAlchemy

     db = SQLAlchemy()

     class Post(db.Model):
     id = db.Column(db.Integer, primary_key=True)
     title = db.Column(db.String(100), nullable=False)
     content = db.Column(db.Text, nullable=False)
    ......

 def init_db(app):
     with app.app_context():
     db.create_all()
  1. 路由和视图

定义路由和视图函数:

from flask import Flask, render_template, request, redirect, url_for.....
from db import count_total_users, email_exists, get_articles, count_total_articles, ......
import markdown, hashlib
from datetime import datetime

app = Flask(__name__)
app.secret_key = '**********************'  # 用于安全的会话管理

# 首页路由
@app.route('/')
def index():
    page = request.args.get('page', 1, type=int)  # 获取当前页码,默认为1
    per_page = 18  # 每页展示的文章数
    query = request.args.get('query', None)  # 获取搜索查询
    lm = request.args.get('lm', None)  
    ic = request.args.get('ic', None)

    if query:  # 如果有查询
        articles = search_articles(query,lm,ic, page, per_page)  # 获取搜索结果
        total_articles = count_total_search_results(query)  # 计算匹配的总文章数
    else:  # 如果没有查询
        articles = get_articles(lm,ic,page, per_page)  # 获取当前页的文章列表
        total_articles = count_total_articles()  # 获取总文章数

    # 格式化 intime 字段
    for article in articles:
    article['intime'] = article['intime'].strftime('%Y年%m月%d日')  # 格式化为字符串

    total_pages = (total_articles + per_page - 1) // per_page  # 计算总页数

return render_template('index.html', articles=articles, page=page, total_pages=total_pages, query=query, lm=lm, ic=ic)

# 加载更多文章的路由
@app.route('/load_more_articles', methods=['GET'])
def load_more_articles():
    page = request.args.get('page', 1, type=int)  # 获取请求的页码
    per_page = 18  # 每次加载的文章数量
    query = request.args.get('query', None)  # 获取搜索查询
    lm = request.args.get('lm', None)  
    ic = request.args.get('ic', None)

if query:  # 如果有查询
    articles = search_articles(query, lm, ic, page, per_page)  # 获取搜索结果
    total_articles = count_total_search_results(query)  # 计算匹配的总文章数
else:  # 如果没有查询
    articles = get_articles(lm, ic, page, per_page)  # 获取当前页的文章列表
    total_articles = count_total_articles()  # 获取总文章数

# 格式化 intime 字段
for article in articles:
    article['intime'] = article['intime'].strftime('%Y年%m月%d日')  # 格式化为字符串
    has_more = (page * per_page) < total_articles  # 检查是否还有更多文章

return jsonify({
    'articles': articles,
    'hasMore': has_more
})

.......

 if __name__ == '__main__':
     init_db(app)
     app.run(debug=True)
  1. 前端页面

使用Jinja2模板引擎创建HTML页面:

<!DOCTYPE html>
<html lang="zh">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>简记 - 极简博客系统</title>
        <link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}">
        <link href="{{ url_for('static', filename='ui/layui/css/layui.css') }}" rel="stylesheet" type="text/css" />
        <link href="{{ url_for('static', filename='ui/main.css') }}" rel="stylesheet" type="text/css" />
        ......
    </head>

    <body>
        <!-- 引入导航栏 -->
        {% include 'navbar.html' %}

        <!-- 文章列表栏 -->
        <div id="container" class="layui-container macy-container">
            <div class="layui-row">
                <div class="layui-col-xs12">
                    <div class="layui-tab layui-tab-brief">
                        <div class="waterfall list-loop-d" id="macy-container">
                            {% for article in articles %}
                            <div class="loop-d layui-anim layui-anim-up">
                                <div class="card">
                                    <div class="card__img">
                                        <a href="{{ url_for('article_detail', article_id=article.id) }}">
                                            {% if article.pic == None %}
                                            <img class="card__picture" src="static/upload/img/7276097257620.png">
                                            {% else %}
                                            <img class="card__picture" src=" {{ article.pic }}">
                                            {% endif %}
                                            <span class="slabel"></span>
                                        </a>
                                    </div>
                                    <div class="card-infos">
                                        <a href="{{ url_for('article_detail', article_id=article.id) }}" class="card__title">
                                            {% if article.visitpwd %}
                                            <i class="layui-icon layui-icon-auz layui-tips"> </i>
                                            {% endif %}
                                            {{ article.name }}
                                        </a>
                                        <article class="card__text">
                                            {{ article.stract }}
                                        </article>
                                        <em class="emtype">
                                            {{ article.intime }}
                                            <a href="?lm={{ article.lm }}" class="lmtype">
                                                <i class="layui-icon layui-icon-note"></i>{{ article.lm }}
                                            </a>
                                        </em>
                                    </div>
                                </div>
                            </div>
                            {% endfor %}
                        </div>
                    </div>
                </div>

                <div class="layui-row layui-margin-top layui-text-center">
                    <div class="layui-col-xs12" id="load-more-container">
                        <!-- 加载更多按钮 -->
                        <button class="layui-btn layui-btn-normal layui-btn-sm load-more-btn" data-current-page="1">
                            加载更多
                        </button>
                    </div>
                </div>
            </div>
        </div>

        <!-- 引入底部栏 -->
        {% include 'footer.html' %}
    </body>
</html>

2.4、成果

通过使用Python和Flask框架,简记博客的改版不仅提高了开发效率,还增强了系统的可维护性和扩展性。简洁的语法和强大的社区支持使得开发者能够更快地实现功能并解决问题。

3、Linux运维工具的开发

3.1、背景

在日常的Linux运维工作中,经常需要进行系统监控、日志分析、自动化部署等任务。传统的脚本语言如Shell虽然简单,但在处理复杂任务时显得力不从心。Python 的强大功能和丰富的第三方库使其成为开发运维工具的理想选择。

3.2、技术栈

  • psutil:跨平台库,用于获取系统信息和进程信息。
  • paramiko:用于SSH连接和远程命令执行。
  • pandas:数据处理和分析库。
  • matplotlib:数据可视化库。

3.3、工具开发

  1. 系统监控工具

使用psutil获取系统资源使用情况:

import psutil

def get_system_info(): cpu_percent = psutil.cpu_percent(interval = 1)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
network_info = psutil.net_io_counters()
return {
    'CPU Usage': cpu_percent,
    'Memory Usage': memory_info.percent,
    'Disk Usage': disk_info.percent,
    'Network Sent': network_info.bytes_sent,
    'Network Received': network_info.bytes_recv
}

if __name__ == '__main__': system_info = get_system_info()
print(system_info)
  1. 日志分析工具

使用pandas处理日志文件:

import pandas as pd

def analyze_log(log_file):
    df = pd.read_csv(log_file, sep = ' ', header = None, names = ['timestamp', 'level', 'message'])
error_count = df[df['level'] == 'ERROR'].shape[0]
warning_count = df[df['level'] == 'WARNING'].shape[0]

return {
    'Total Errors': error_count,
    'Total Warnings': warning_count
}

if __name__ == '__main__':
    log_file = 'system.log'
log_analysis = analyze_log(log_file)
print(log_analysis)
  1. 自动化部署工具

使用paramiko进行远程服务器管理:

import paramiko  # type: ignore
import os
from tqdm import tqdm  # type: ignore
import time  # 导入time模块以便进行重试

def ssh_login(hostname, port, username, password):
    # 创建 SSH 客户端
    client = paramiko.SSHClient()

    # 自动添加主机密钥到 known_hosts
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    while True:
        try:
            # 连接到服务器
            client.connect(hostname, port, username, password)
            print("连接成功!")
            break  # 如果连接成功,跳出循环
        except paramiko.AuthenticationException:
            print("用户名或密码错误,请重新输入。")
            username = input("请输入用户名: ")
            password = input("请输入密码: ")
        except Exception as e:
            print(f"连接出错,服务器无响应: {e}")
            return

    # 检查内核版本
    stdin, stdout, stderr = client.exec_command('uname -r')
    kernel_version = stdout.read().decode().strip()
    print(f"内核版本: {kernel_version}")

    # 检查CPU架构与核心数
    stdin, stdout, stderr = client.exec_command('lscpu')
    cpu_info = stdout.read().decode().strip()
    print(f"CPU信息:\n{cpu_info}")

    # 检查存储空间
    stdin, stdout, stderr = client.exec_command('df -h')
    disk_space = stdout.read().decode()
    print("存储空间:\n", disk_space)

    # 检查内存空间
    stdin, stdout, stderr = client.exec_command('free -h')
    memory_info = stdout.read().decode()
    print("内存空间:\n", memory_info)

    # 检查unzip和GCC环境
    stdin, stdout, stderr = client.exec_command('command -v unzip')
    unzip_status = stdout.read().decode().strip()
    if unzip_status:
        print("unzip已安装。")
    else:
        print("unzip未安装。")
        upload_unzip = input("是否上传本地包并进行安装?(y/n): ").strip().lower()
        if upload_unzip == 'y':
            # 检查目标系统支持的包管理工具
            stdin, stdout, stderr = client.exec_command('command -v rpm')
            rpm_available = stdout.read().decode().strip()

            stdin, stdout, stderr = client.exec_command('command -v dpkg')
            dpkg_available = stdout.read().decode().strip()

            if rpm_available and not dpkg_available:
                package_format = 'unzip.rpm'
            elif dpkg_available and not rpm_available:
                package_format = 'unzip_amd64.deb'
            elif not rpm_available and not dpkg_available:
                print("错误:目标系统不支持 rpm 或 dpkg,无法安装 unzip。")
                return
            else:
                package_format = 'unzip.rpm 或 unzip_amd64.deb'

            local_unzip_path = input(f"请根据操作系统选择对应的包,然后输入 {package_format} 文件的本地路径: ")

            # 确认文件存在
            if os.path.isfile(local_unzip_path):
                folder_name = "unzip_package"  # 你可以根据需求修改文件夹名称
                remote_path = f"/{folder_name}"
                create_remote_directory(client, remote_path)

                target_path = f"{remote_path}/unzip_package_file"
                sftp = client.open_sftp()
                # 上传 unzip 文件
                sftp.put(local_unzip_path, target_path)
                sftp.close()
                print(f"文件 '{local_unzip_path}' 成功上传到 '{target_path}'。")

                # 判断文件类型,选择安装命令
                if local_unzip_path.endswith('.rpm'):
                    install_command = f'sudo rpm -ivh --nodeps {target_path}'  # 添加 --nodeps
                elif local_unzip_path.endswith('.deb'):
                    install_command = f'sudo dpkg -i {target_path}'
                else:
                    print("错误:文件类型不受支持,请确保是 .rpm 或 .deb 包。")
                    exit(1)

                # 安装unzip
                stdin, stdout, stderr = client.exec_command(install_command)
                output = stdout.read().decode()
                error = stderr.read().decode()

                if output:
                    print(f"安装输出:\n{output}")
                if error:
                    print(f"安装错误:\n{error}")
                    print("安装失败,请检查错误信息。")
                else:
                    print("unzip已安装。")

                # 确认安装成功后,重新检查 unzip 是否可用
                stdin, stdout, stderr = client.exec_command('command -v unzip')
                unzip_status = stdout.read().decode().strip()
                if unzip_status:
                    print("unzip 已安装并可用。")
                else:
                    print("unzip 安装失败,请检查。")
            else:
                print("指定的文件不存在,请检查路径。")
                return

    stdin, stdout, stderr = client.exec_command('command -v gcc')
    gcc_status = stdout.read().decode().strip()
    if gcc_status:
        print("GCC已安装。")
    else:
        print("GCC未安装。")

    # 选择模式
    mode = input("请选择模式 (1: 远程安装模式, 2: 纯命令模式): ").strip()
    if mode == '1':
        remote_install_mode(client)
    elif mode == '2':
        command_mode(client)
    else:
        print("无效的选择,退出程序。")
        client.close()

def create_remote_directory(client, remote_directory):
    try:
        mkdir_command = f'mkdir -p {remote_directory}'
        client.exec_command(mkdir_command)
    except Exception as e:
        print(f"创建目录时出错: {e}")

def remote_install_mode(client):
    # 用户输入要进入的路径
    remote_path = input("请输入要进入的远程路径(不需以'/'开头): ").lstrip('/')
    remote_path = f'/{remote_path}'

    # 进入指定路径
    command = f'cd {remote_path}'
    stdin, stdout, stderr = client.exec_command(command)
    error = stderr.read().decode()
    if error:
        print(f"进入路径时出错: {error}")
        return

    # 用户输入要创建的远程文件夹名称
    folder_name = input("请输入要在远程创建的文件夹名称: ")

    # 检查远程文件夹是否已经存在
    check_folder_command = f'if [ ! -d "{remote_path}/{folder_name}" ]; then mkdir {remote_path}/{folder_name}; fi'
    stdin, stdout, stderr = client.exec_command(check_folder_command)
    error = stderr.read().decode()
    if error:
        print(f"检查或创建文件夹时出错: {error}")
        return

    print(f"文件夹 '{folder_name}' 已创建或已存在。")

    # 用户输入要上传的本地文件或文件夹路径
    local_path = input("请输入要上传的本地文件或文件夹路径: ")

    # 检查输入是文件还是文件夹
    if os.path.isdir(local_path):  # 如果是文件夹
        sftp = client.open_sftp()
        for root, dirs, files in os.walk(local_path):
            for filename in files:
                local_file_path = os.path.join(root, filename)
                # 计算在远程的目标路径,确保路径使用 '/'
                target_path = f"{remote_path}/{folder_name}/{os.path.relpath(local_file_path, local_path).replace('\\', '/')}"

                # 创建目录结构,添加确认检查
                target_dir = os.path.dirname(target_path)

                # 确保目标目录存在(创建)
                create_remote_directory(client, target_dir)

                file_size = os.path.getsize(local_file_path)

                # 再次确认目标路径的目录是否存在
                stdin, stdout, stderr = client.exec_command(f'if [ ! -d "{target_dir}" ]; then exit 1; fi')
                error = stderr.read().decode()
                if error:
                    print(f"目标目录 '{target_dir}' 不存在,跳过文件 '{local_file_path}' 的上传。")
                    continue  # 跳过此文件的上传

                with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"上传进度 - {filename}") as pbar:
                    def callback(transferred, total):
                        pbar.n = transferred
                        pbar.refresh()

                    # 上传文件
                    sftp.put(local_file_path, target_path, callback=callback)

                print(f"文件 '{local_file_path}' 成功上传到 '{target_path}'。")

        sftp.close()

    elif os.path.isfile(local_path):  # 如果是单个文件
        target_path = f"{remote_path}/{folder_name}/{os.path.basename(local_path)}"

        sftp = client.open_sftp()
        file_size = os.path.getsize(local_path)

        target_dir = os.path.dirname(target_path)
        create_remote_directory(client, target_dir)  # 确保目录存在

        # 再次确认目标路径的目录是否存在
        stdin, stdout, stderr = client.exec_command(f'if [ ! -d "{target_dir}" ]; then exit 1; fi')
        error = stderr.read().decode()
        if error:
            print(f"目标目录 '{target_dir}' 不存在,无法上传文件 '{local_path}'。")
            return  # 返回,不再继续上传

        with tqdm(total=file_size, unit='B', unit_scale=True, desc=f"上传进度 - {os.path.basename(local_path)}") as pbar:
            def callback(transferred, total):
                pbar.n = transferred
                pbar.refresh()

            sftp.put(local_path, target_path, callback=callback)

        sftp.close()
        print(f"文件 '{local_path}' 成功上传到 '{target_path}'。")
    else:
        print("指定的路径无效,请确认文件或文件夹路径。")
        return

    # 询问用户是否解压并安装
    unzip = input("是否开始解压安装包?(y/n): ").strip().lower()
    if unzip == 'y':
        print("开始解压安装包...")

        # 假设用户上传了一个 zip 文件
        stdin, stdout, stderr = client.exec_command(f'unzip {remote_path}/{folder_name}/*.zip -d {remote_path}/{folder_name}') 
        output = stdout.read().decode()
        error = stderr.read().decode()

        if output:
            print(f"输出:\n{output}")
        if error:
            print(f"解压安装包时出错: {error}")
        else:
            print("安装包解压成功!")

    install = input("是否开始安装?(y/n): ").strip().lower()
    if install == 'y':
        print("开始安装...")
        stdin, stdout, stderr = client.exec_command(f'cd {remote_path}/{folder_name} && chmod +x regAll.sh && bash regAll.sh')  

        # 实时输出stdout和stderr,并处理特定的项目
        while True:
            time.sleep(2)  # 适当的等待时间,以避免过于频繁的读操作
            output_line = stdout.readline()
            error_line = stderr.readline()

            if output_line:
                output_line = output_line.strip()
                print(f"输出: {output_line}")

                # 根据输出内容进行项目处理
                if "是否立即重启服务器" in output_line:
                    reboot_choice = input("请输入你的选择 [y/N]: ").strip().lower()
                    stdin.write(f"{reboot_choice}\n")  # 将用户的选择写入到stdin
                    stdin.flush()  # 确保选择被发送

            if error_line:
                print(f"错误: {error_line.strip()}")

            if output_line == '' and error_line == '' and stdout.channel.exit_status_ready():
                break  # 如果两个流都没有输出,并且命令已经结束,则退出循环

        print("安装过程已完成。")

    print("退出远程安装模式。")

def command_mode(client):
    current_dir = '/'  # 初始工作目录为根目录
    while True:
        command = input(f"当前目录: {current_dir}\n请输入要执行的命令 (输入'exit'退出): ").strip()
        if command.lower() == 'exit':
            break

        # 处理 cd 命令
        if command.startswith('cd '):
            new_dir = command[3:].strip()
            if new_dir.startswith('/'):
                current_dir = new_dir
            else:
                current_dir = f"{current_dir.rstrip('/')}/{new_dir}"
            continue 

        cd_command = f'cd {current_dir}; {command}'
        stdin, stdout, stderr = client.exec_command(cd_command)
        output = stdout.read().decode()
        error = stderr.read().decode()

        if output:
            print(f"输出:\n{output}")
        if error:
            print(f"错误:\n{error}")

    print("退出命令模式。")

# 接收用户输入
hostname = input("请输入服务器IP地址: ") 
port = int(input("请输入端口(默认为22): ")
username = input("请输入用户名: ") 
password = input("请输入密码: ") 

# 登录并执行命令
ssh_login(hostname, port, username, password)

3.4、成果

通过使用Python开发的运维工具,大大提高了工作效率和系统的稳定性。丰富的第三方库使得开发过程更加便捷,同时Python的跨平台特性也使得这些工具可以在不同的操作系统上运行。

4、结语

Python 的简洁语法和强大的生态系统使其成为现代开发者的首选语言之一。无论是Web开发还是系统运维,Python 都能提供高效、可靠的解决方案。希望本文的案例能激发更多运维人员探索Python的无限可能,为自己的项目带来更多的创新和便利。

文章目录