编程学习
5分钟阅读

Python自动化脚本学习笔记

Python自动化脚本学习笔记

学习背景

作为一名运维工程师,日常工作中面临着大量重复性的任务:服务器状态检查、日志文件分析、批量配置更新、定时任务执行等。手动操作不仅效率低下,还容易出错。于是我决定学习Python来实现工作自动化,提高工作效率和准确性。

学习路径

第一阶段:Python基础语法

刚开始接触Python时,被它简洁优雅的语法所吸引。相比其他编程语言,Python更接近自然语言,学习起来相对容易。

变量和数据类型

# 基础数据类型
name = "运维工程师"          # 字符串
age = 28                    # 整数
salary = 15000.5           # 浮点数
is_working = True          # 布尔值

# 数据结构
servers = ["web1", "web2", "db1"]           # 列表
server_info = {"ip": "192.168.1.100", "port": 22}  # 字典
coordinates = (116.4074, 39.9042)           # 元组
unique_ips = {"192.168.1.1", "192.168.1.2"} # 集合

控制流语句

# 条件判断
if cpu_usage > 80:
    print("CPU使用率过高,需要检查")
elif cpu_usage > 60:
    print("CPU使用率较高,需要关注")
else:
    print("CPU使用率正常")

# 循环处理
for server in servers:
    print(f"正在检查服务器: {server}")
    
# 字典遍历
for key, value in server_info.items():
    print(f"{key}: {value}")

第二阶段:标准库应用

掌握基础语法后,开始学习Python强大的标准库。

系统操作

import os
import sys
import subprocess

# 获取系统信息
print(f"操作系统: {os.name}")
print(f"当前目录: {os.getcwd()}")
print(f"Python版本: {sys.version}")

# 执行系统命令
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)

# 环境变量操作
home_dir = os.environ.get('HOME', '/tmp')
os.environ['MY_VAR'] = 'my_value'

文件操作

from pathlib import Path
import shutil

# 路径操作
log_dir = Path('/var/log')
if log_dir.exists():
    for log_file in log_dir.glob('*.log'):
        print(f"日志文件: {log_file}")

# 文件读写
with open('/etc/passwd', 'r') as f:
    users = f.readlines()
    print(f"系统用户数: {len(users)}")

# 文件复制和移动
shutil.copy2('source.txt', 'backup.txt')
shutil.move('old_location.txt', 'new_location.txt')

第三阶段:网络和数据处理

网络编程

import requests
import json
import socket

# HTTP请求
response = requests.get('http://httpbin.org/ip')
if response.status_code == 200:
    data = response.json()
    print(f"公网IP: {data['origin']}")

# 端口检测
def check_port(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    result = sock.connect_ex((host, port))
    sock.close()
    return result == 0

if check_port('www.baidu.com', 80):
    print("端口80开放")

数据处理

import json
import csv
import re
from datetime import datetime

# JSON处理
config = {
    "servers": ["web1", "web2"],
    "database": {"host": "db1", "port": 3306}
}
with open('config.json', 'w') as f:
    json.dump(config, f, indent=2)

# CSV处理
with open('servers.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['hostname', 'ip', 'status'])
    writer.writerow(['web1', '192.168.1.10', 'running'])

# 正则表达式
log_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)'
log_line = "2024-01-15 10:30:45 [ERROR] Database connection failed"
match = re.match(log_pattern, log_line)
if match:
    timestamp, level, message = match.groups()
    print(f"时间: {timestamp}, 级别: {level}, 消息: {message}")

实际项目经验

项目1:系统监控脚本

第一个完整的Python项目是写一个系统监控脚本。

系统资源监控

#!/usr/bin/env python3
import psutil
import time
import json
from datetime import datetime

class SystemMonitor:
    def __init__(self):
        self.thresholds = {
            'cpu': 80,
            'memory': 85,
            'disk': 90
        }
    
    def get_cpu_usage(self):
        return psutil.cpu_percent(interval=1)
    
    def get_memory_usage(self):
        memory = psutil.virtual_memory()
        return memory.percent
    
    def get_disk_usage(self, path='/'):
        disk = psutil.disk_usage(path)
        return (disk.used / disk.total) * 100
    
    def check_system(self):
        cpu = self.get_cpu_usage()
        memory = self.get_memory_usage()
        disk = self.get_disk_usage()
        
        status = {
            'timestamp': datetime.now().isoformat(),
            'cpu_usage': cpu,
            'memory_usage': memory,
            'disk_usage': disk,
            'alerts': []
        }
        
        if cpu > self.thresholds['cpu']:
            status['alerts'].append(f"CPU使用率过高: {cpu:.1f}%")
        
        if memory > self.thresholds['memory']:
            status['alerts'].append(f"内存使用率过高: {memory:.1f}%")
        
        if disk > self.thresholds['disk']:
            status['alerts'].append(f"磁盘使用率过高: {disk:.1f}%")
        
        return status

if __name__ == "__main__":
    monitor = SystemMonitor()
    while True:
        status = monitor.check_system()
        print(json.dumps(status, indent=2, ensure_ascii=False))
        
        if status['alerts']:
            # 这里可以添加告警逻辑,如发送邮件或钉钉消息
            print("⚠️  系统告警:")
            for alert in status['alerts']:
                print(f"   {alert}")
        
        time.sleep(60)  # 每分钟检查一次

项目2:日志分析工具

编写了一个分析Nginx访问日志的工具:

#!/usr/bin/env python3
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.log_pattern = re.compile(
            r'(\S+) - - \[([^\]]+)\] "(\S+) (\S+) (\S+)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
        )
        
    def parse_log_line(self, line):
        match = self.log_pattern.match(line)
        if match:
            return {
                'ip': match.group(1),
                'timestamp': match.group(2),
                'method': match.group(3),
                'url': match.group(4),
                'protocol': match.group(5),
                'status': int(match.group(6)),
                'size': int(match.group(7)),
                'referer': match.group(8),
                'user_agent': match.group(9)
            }
        return None
    
    def analyze(self):
        ip_counter = Counter()
        status_counter = Counter()
        url_counter = Counter()
        error_logs = []
        
        with open(self.log_file, 'r') as f:
            for line_num, line in enumerate(f, 1):
                log_entry = self.parse_log_line(line.strip())
                if log_entry:
                    ip_counter[log_entry['ip']] += 1
                    status_counter[log_entry['status']] += 1
                    url_counter[log_entry['url']] += 1
                    
                    # 收集错误日志
                    if log_entry['status'] >= 400:
                        error_logs.append({
                            'line': line_num,
                            'ip': log_entry['ip'],
                            'status': log_entry['status'],
                            'url': log_entry['url'],
                            'timestamp': log_entry['timestamp']
                        })
        
        return {
            'top_ips': ip_counter.most_common(10),
            'status_codes': dict(status_counter),
            'top_urls': url_counter.most_common(10),
            'error_logs': error_logs[:20]  # 只显示前20个错误
        }
    
    def generate_report(self):
        results = self.analyze()
        
        print("=== Nginx日志分析报告 ===\n")
        
        print("🔥 访问量最高的IP地址:")
        for ip, count in results['top_ips']:
            print(f"   {ip}: {count} 次访问")
        
        print("\n📊 HTTP状态码分布:")
        for status, count in sorted(results['status_codes'].items()):
            print(f"   {status}: {count} 次")
        
        print("\n🌐 访问量最高的URL:")
        for url, count in results['top_urls']:
            print(f"   {url}: {count} 次访问")
        
        if results['error_logs']:
            print("\n❌ 错误日志 (最近20条):")
            for error in results['error_logs']:
                print(f"   行{error['line']}: {error['ip']} - {error['status']} - {error['url']}")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("使用方法: python log_analyzer.py <log_file>")
        sys.exit(1)
    
    analyzer = LogAnalyzer(sys.argv[1])
    analyzer.generate_report()

项目3:服务器批量管理

开发了一个批量管理多台服务器的工具:

#!/usr/bin/env python3
import paramiko
import threading
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

class ServerManager:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        self.results = {}
    
    def connect_server(self, server_info):
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(
                hostname=server_info['host'],
                username=server_info['username'],
                password=server_info.get('password'),
                key_filename=server_info.get('key_file'),
                timeout=10
            )
            return ssh
        except Exception as e:
            print(f"连接 {server_info['host']} 失败: {e}")
            return None
    
    def execute_command(self, server_info, command):
        ssh = self.connect_server(server_info)
        if ssh:
            try:
                stdin, stdout, stderr = ssh.exec_command(command)
                output = stdout.read().decode('utf-8')
                error = stderr.read().decode('utf-8')
                ssh.close()
                
                return {
                    'host': server_info['host'],
                    'success': True,
                    'output': output,
                    'error': error
                }
            except Exception as e:
                ssh.close()
                return {
                    'host': server_info['host'],
                    'success': False,
                    'error': str(e)
                }
        else:
            return {
                'host': server_info['host'],
                'success': False,
                'error': 'Connection failed'
            }
    
    def batch_execute(self, command, max_workers=5):
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.execute_command, server, command): server
                for server in self.config['servers']
            }
            
            results = []
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                
                if result['success']:
                    print(f"✅ {result['host']}: 命令执行成功")
                    if result['output']:
                        print(f"   输出: {result['output'].strip()}")
                else:
                    print(f"❌ {result['host']}: {result['error']}")
            
            return results
    
    def get_system_info(self):
        command = "uname -a && free -h && df -h"
        return self.batch_execute(command)
    
    def update_system(self):
        # 根据不同系统执行不同的更新命令
        command = """
        if command -v yum >/dev/null 2>&1; then
            sudo yum update -y
        elif command -v apt >/dev/null 2>&1; then
            sudo apt update && sudo apt upgrade -y
        else
            echo "Unsupported package manager"
        fi
        """
        return self.batch_execute(command)

# 配置文件示例 (servers.json)
config_example = {
    "servers": [
        {
            "host": "192.168.1.10",
            "username": "root",
            "password": "your_password"
        },
        {
            "host": "192.168.1.11",
            "username": "admin",
            "key_file": "/path/to/private_key"
        }
    ]
}

if __name__ == "__main__":
    manager = ServerManager('servers.json')
    
    print("=== 获取系统信息 ===")
    manager.get_system_info()
    
    print("\n=== 执行自定义命令 ===")
    custom_command = input("请输入要执行的命令: ")
    if custom_command:
        manager.batch_execute(custom_command)

学习中的困难和解决

困难1:异常处理

刚开始写脚本时,经常因为没有处理异常而导致程序崩溃。

解决方案:

import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('script.log'),
        logging.StreamHandler()
    ]
)

def safe_execute(func, *args, **kwargs):
    try:
        return func(*args, **kwargs)
    except FileNotFoundError as e:
        logging.error(f"文件未找到: {e}")
    except PermissionError as e:
        logging.error(f"权限不足: {e}")
    except Exception as e:
        logging.error(f"未知错误: {e}")
    return None

困难2:第三方库管理

不同项目需要不同版本的库,容易产生冲突。

解决方案:

# 使用虚拟环境
python -m venv myproject_env
source myproject_env/bin/activate  # Linux/Mac
# myproject_env\Scripts\activate   # Windows

# 管理依赖
pip freeze > requirements.txt
pip install -r requirements.txt

# 使用pipenv(推荐)
pip install pipenv
pipenv install requests
pipenv shell

困难3:代码组织

随着脚本变得复杂,代码组织成为问题。

解决方案:

# 项目结构
project/
├── main.py
├── config/
│   ├── __init__.py
│   └── settings.py
├── utils/
│   ├── __init__.py
│   ├── system.py
│   └── network.py
├── logs/
└── requirements.txt

# 模块化设计
# utils/system.py
def get_cpu_usage():
    import psutil
    return psutil.cpu_percent()

def get_memory_usage():
    import psutil
    return psutil.virtual_memory().percent

# main.py
from utils.system import get_cpu_usage, get_memory_usage
from config.settings import THRESHOLDS

def main():
    cpu = get_cpu_usage()
    memory = get_memory_usage()
    
    if cpu > THRESHOLDS['cpu']:
        print(f"CPU使用率过高: {cpu}%")

常用工具和技巧

调试技巧

使用pdb调试器

import pdb

def problematic_function(data):
    pdb.set_trace()  # 设置断点
    result = process_data(data)
    return result

# 或者在命令行中运行
# python -m pdb script.py

日志调试

import logging

# 详细的日志配置
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)

logger = logging.getLogger(__name__)

def my_function():
    logger.debug("函数开始执行")
    logger.info("处理数据中...")
    logger.warning("发现潜在问题")
    logger.error("发生错误")

性能优化

使用生成器节省内存

# 不好的做法 - 占用大量内存
def read_large_file_bad(filename):
    with open(filename, 'r') as f:
        return f.readlines()  # 一次性读取所有行

# 好的做法 - 使用生成器
def read_large_file_good(filename):
    with open(filename, 'r') as f:
        for line in f:
            yield line.strip()

# 使用
for line in read_large_file_good('huge_log.txt'):
    if 'ERROR' in line:
        print(line)

并发处理

import concurrent.futures
import requests

def check_url(url):
    try:
        response = requests.get(url, timeout=5)
        return f"{url}: {response.status_code}"
    except Exception as e:
        return f"{url}: Error - {e}"

urls = ['http://example.com', 'http://google.com', 'http://github.com']

# 并发检查多个URL
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(check_url, urls))
    for result in results:
        print(result)

实用工具函数

import functools
import time
from pathlib import Path

# 计时装饰器
def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.2f}秒")
        return result
    return wrapper

# 重试装饰器
def retry(max_attempts=3, delay=1):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise e
                    print(f"第{attempt + 1}次尝试失败: {e}")
                    time.sleep(delay)
        return wrapper
    return decorator

# 文件备份函数
def backup_file(file_path):
    path = Path(file_path)
    if path.exists():
        backup_path = path.with_suffix(f'.backup.{int(time.time())}')
        path.rename(backup_path)
        print(f"文件已备份到: {backup_path}")
        return backup_path
    return None

# 配置文件读取
def load_config(config_file, default_config=None):
    import json
    try:
        with open(config_file, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        if default_config:
            with open(config_file, 'w') as f:
                json.dump(default_config, f, indent=2)
            return default_config
        raise

学习心得

1. 从实际需求出发

每个脚本都应该解决具体的问题,不要为了学习而学习。

2. 重视代码质量

  • 使用有意义的变量名
  • 添加适当的注释
  • 遵循PEP 8编码规范
  • 编写单元测试

3. 持续重构优化

定期回顾和改进代码,提高可读性和可维护性。

4. 学会使用工具

  • IDE: PyCharm, VS Code
  • 调试: pdb, logging
  • 测试: pytest, unittest
  • 代码质量: flake8, black, mypy

5. 关注安全性

# 避免SQL注入
import sqlite3
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))

# 避免命令注入
import shlex
safe_command = shlex.quote(user_input)

# 处理敏感信息
import os
password = os.environ.get('DB_PASSWORD')  # 从环境变量读取

当前水平

经过几个月的学习,我现在能够:

  • ✅ 熟练使用Python基础语法和标准库
  • ✅ 编写复杂的自动化脚本解决实际问题
  • ✅ 进行错误处理和日志记录
  • ✅ 使用第三方库扩展功能
  • ✅ 编写可维护和可扩展的代码

下一步计划

  • 深入学习Python高级特性(装饰器、元类、异步编程)
  • 学习Web框架开发(Flask、Django)
  • 掌握数据库操作和ORM
  • 了解机器学习和数据科学应用
  • 参与开源项目贡献代码

总结

Python自动化脚本学习是一个循序渐进的过程,需要大量的实践和积累。从最初的语法学习到现在能够编写复杂的自动化工具,这个过程让我深刻体会到了Python的强大和灵活。

最重要的是要保持学习的热情,遇到问题时多思考、多查资料、多实践。每解决一个实际问题,都会让你对Python有更深的理解。

记住:Python不仅仅是一门编程语言,更是一种解决问题的思维方式和工具。

目录大纲

下列每一节都提供了完整的小节内容,方便从侧栏快速定位:

一、学习背景

  • 为什么学习Python自动化
  • 学习目标和规划
  • 开发环境搭建

二、学习路径

  • Python基础语法(变量、控制流、函数)
  • 标准库应用(系统操作、文件处理、网络编程)
  • 数据处理技能(JSON、CSV、正则表达式)

三、实际项目

  • 系统监控脚本(资源监控与告警)
  • 日志分析工具(Nginx日志分析)
  • 服务器批量管理(SSH批量操作)

四、困难与解决

  • 异常处理机制
  • 第三方库管理
  • 代码组织结构

五、工具技巧

  • 调试技巧(pdb、logging)
  • 性能优化(生成器、并发)
  • 实用工具函数

六、心得与规划

  • 学习方法总结
  • 当前技能水平评估
  • 未来学习路线规划