编程学习
5分钟阅读
Python自动化脚本学习笔记
Python自动化脚本学习笔记
学习背景
作为一名运维工程师,日常工作中面临着大量重复性的任务:服务器状态检查、日志文件分析、批量配置更新、定时任务执行等。手动操作不仅效率低下,还容易出错。于是我决定学习Python来实现工作自动化,提高工作效率和准确性。
学习路径
第一阶段:Python基础语法
刚开始接触Python时,被它简洁优雅的语法所吸引。相比其他编程语言,Python更接近自然语言,学习起来相对容易。
变量和数据类型
# 基础数据类型
name = "运维工程师" # 字符串
age = 28 # 整数
salary = 15000.5 # 浮点数
is_working = True # 布尔值
# 数据结构
servers = ["web1", "web2", "db1"] # 列表
server_info = {"ip": "192.168.1.100", "port": 22} # 字典
coordinates = (116.4074, 39.9042) # 元组
unique_ips = {"192.168.1.1", "192.168.1.2"} # 集合
控制流语句
# 条件判断
if cpu_usage > 80:
print("CPU使用率过高,需要检查")
elif cpu_usage > 60:
print("CPU使用率较高,需要关注")
else:
print("CPU使用率正常")
# 循环处理
for server in servers:
print(f"正在检查服务器: {server}")
# 字典遍历
for key, value in server_info.items():
print(f"{key}: {value}")
第二阶段:标准库应用
掌握基础语法后,开始学习Python强大的标准库。
系统操作
import os
import sys
import subprocess
# 获取系统信息
print(f"操作系统: {os.name}")
print(f"当前目录: {os.getcwd()}")
print(f"Python版本: {sys.version}")
# 执行系统命令
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
# 环境变量操作
home_dir = os.environ.get('HOME', '/tmp')
os.environ['MY_VAR'] = 'my_value'
文件操作
from pathlib import Path
import shutil
# 路径操作
log_dir = Path('/var/log')
if log_dir.exists():
for log_file in log_dir.glob('*.log'):
print(f"日志文件: {log_file}")
# 文件读写
with open('/etc/passwd', 'r') as f:
users = f.readlines()
print(f"系统用户数: {len(users)}")
# 文件复制和移动
shutil.copy2('source.txt', 'backup.txt')
shutil.move('old_location.txt', 'new_location.txt')
第三阶段:网络和数据处理
网络编程
import requests
import json
import socket
# HTTP请求
response = requests.get('http://httpbin.org/ip')
if response.status_code == 200:
data = response.json()
print(f"公网IP: {data['origin']}")
# 端口检测
def check_port(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
if check_port('www.baidu.com', 80):
print("端口80开放")
数据处理
import json
import csv
import re
from datetime import datetime
# JSON处理
config = {
"servers": ["web1", "web2"],
"database": {"host": "db1", "port": 3306}
}
with open('config.json', 'w') as f:
json.dump(config, f, indent=2)
# CSV处理
with open('servers.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['hostname', 'ip', 'status'])
writer.writerow(['web1', '192.168.1.10', 'running'])
# 正则表达式
log_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)'
log_line = "2024-01-15 10:30:45 [ERROR] Database connection failed"
match = re.match(log_pattern, log_line)
if match:
timestamp, level, message = match.groups()
print(f"时间: {timestamp}, 级别: {level}, 消息: {message}")
实际项目经验
项目1:系统监控脚本
第一个完整的Python项目是写一个系统监控脚本。
系统资源监控
#!/usr/bin/env python3
import psutil
import time
import json
from datetime import datetime
class SystemMonitor:
def __init__(self):
self.thresholds = {
'cpu': 80,
'memory': 85,
'disk': 90
}
def get_cpu_usage(self):
return psutil.cpu_percent(interval=1)
def get_memory_usage(self):
memory = psutil.virtual_memory()
return memory.percent
def get_disk_usage(self, path='/'):
disk = psutil.disk_usage(path)
return (disk.used / disk.total) * 100
def check_system(self):
cpu = self.get_cpu_usage()
memory = self.get_memory_usage()
disk = self.get_disk_usage()
status = {
'timestamp': datetime.now().isoformat(),
'cpu_usage': cpu,
'memory_usage': memory,
'disk_usage': disk,
'alerts': []
}
if cpu > self.thresholds['cpu']:
status['alerts'].append(f"CPU使用率过高: {cpu:.1f}%")
if memory > self.thresholds['memory']:
status['alerts'].append(f"内存使用率过高: {memory:.1f}%")
if disk > self.thresholds['disk']:
status['alerts'].append(f"磁盘使用率过高: {disk:.1f}%")
return status
if __name__ == "__main__":
monitor = SystemMonitor()
while True:
status = monitor.check_system()
print(json.dumps(status, indent=2, ensure_ascii=False))
if status['alerts']:
# 这里可以添加告警逻辑,如发送邮件或钉钉消息
print("⚠️ 系统告警:")
for alert in status['alerts']:
print(f" {alert}")
time.sleep(60) # 每分钟检查一次
项目2:日志分析工具
编写了一个分析Nginx访问日志的工具:
#!/usr/bin/env python3
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.log_pattern = re.compile(
r'(\S+) - - \[([^\]]+)\] "(\S+) (\S+) (\S+)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
)
def parse_log_line(self, line):
match = self.log_pattern.match(line)
if match:
return {
'ip': match.group(1),
'timestamp': match.group(2),
'method': match.group(3),
'url': match.group(4),
'protocol': match.group(5),
'status': int(match.group(6)),
'size': int(match.group(7)),
'referer': match.group(8),
'user_agent': match.group(9)
}
return None
def analyze(self):
ip_counter = Counter()
status_counter = Counter()
url_counter = Counter()
error_logs = []
with open(self.log_file, 'r') as f:
for line_num, line in enumerate(f, 1):
log_entry = self.parse_log_line(line.strip())
if log_entry:
ip_counter[log_entry['ip']] += 1
status_counter[log_entry['status']] += 1
url_counter[log_entry['url']] += 1
# 收集错误日志
if log_entry['status'] >= 400:
error_logs.append({
'line': line_num,
'ip': log_entry['ip'],
'status': log_entry['status'],
'url': log_entry['url'],
'timestamp': log_entry['timestamp']
})
return {
'top_ips': ip_counter.most_common(10),
'status_codes': dict(status_counter),
'top_urls': url_counter.most_common(10),
'error_logs': error_logs[:20] # 只显示前20个错误
}
def generate_report(self):
results = self.analyze()
print("=== Nginx日志分析报告 ===\n")
print("🔥 访问量最高的IP地址:")
for ip, count in results['top_ips']:
print(f" {ip}: {count} 次访问")
print("\n📊 HTTP状态码分布:")
for status, count in sorted(results['status_codes'].items()):
print(f" {status}: {count} 次")
print("\n🌐 访问量最高的URL:")
for url, count in results['top_urls']:
print(f" {url}: {count} 次访问")
if results['error_logs']:
print("\n❌ 错误日志 (最近20条):")
for error in results['error_logs']:
print(f" 行{error['line']}: {error['ip']} - {error['status']} - {error['url']}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("使用方法: python log_analyzer.py <log_file>")
sys.exit(1)
analyzer = LogAnalyzer(sys.argv[1])
analyzer.generate_report()
项目3:服务器批量管理
开发了一个批量管理多台服务器的工具:
#!/usr/bin/env python3
import paramiko
import threading
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
class ServerManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.results = {}
def connect_server(self, server_info):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=server_info['host'],
username=server_info['username'],
password=server_info.get('password'),
key_filename=server_info.get('key_file'),
timeout=10
)
return ssh
except Exception as e:
print(f"连接 {server_info['host']} 失败: {e}")
return None
def execute_command(self, server_info, command):
ssh = self.connect_server(server_info)
if ssh:
try:
stdin, stdout, stderr = ssh.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
ssh.close()
return {
'host': server_info['host'],
'success': True,
'output': output,
'error': error
}
except Exception as e:
ssh.close()
return {
'host': server_info['host'],
'success': False,
'error': str(e)
}
else:
return {
'host': server_info['host'],
'success': False,
'error': 'Connection failed'
}
def batch_execute(self, command, max_workers=5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.execute_command, server, command): server
for server in self.config['servers']
}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
if result['success']:
print(f"✅ {result['host']}: 命令执行成功")
if result['output']:
print(f" 输出: {result['output'].strip()}")
else:
print(f"❌ {result['host']}: {result['error']}")
return results
def get_system_info(self):
command = "uname -a && free -h && df -h"
return self.batch_execute(command)
def update_system(self):
# 根据不同系统执行不同的更新命令
command = """
if command -v yum >/dev/null 2>&1; then
sudo yum update -y
elif command -v apt >/dev/null 2>&1; then
sudo apt update && sudo apt upgrade -y
else
echo "Unsupported package manager"
fi
"""
return self.batch_execute(command)
# 配置文件示例 (servers.json)
config_example = {
"servers": [
{
"host": "192.168.1.10",
"username": "root",
"password": "your_password"
},
{
"host": "192.168.1.11",
"username": "admin",
"key_file": "/path/to/private_key"
}
]
}
if __name__ == "__main__":
manager = ServerManager('servers.json')
print("=== 获取系统信息 ===")
manager.get_system_info()
print("\n=== 执行自定义命令 ===")
custom_command = input("请输入要执行的命令: ")
if custom_command:
manager.batch_execute(custom_command)
学习中的困难和解决
困难1:异常处理
刚开始写脚本时,经常因为没有处理异常而导致程序崩溃。
解决方案:
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('script.log'),
logging.StreamHandler()
]
)
def safe_execute(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except FileNotFoundError as e:
logging.error(f"文件未找到: {e}")
except PermissionError as e:
logging.error(f"权限不足: {e}")
except Exception as e:
logging.error(f"未知错误: {e}")
return None
困难2:第三方库管理
不同项目需要不同版本的库,容易产生冲突。
解决方案:
# 使用虚拟环境
python -m venv myproject_env
source myproject_env/bin/activate # Linux/Mac
# myproject_env\Scripts\activate # Windows
# 管理依赖
pip freeze > requirements.txt
pip install -r requirements.txt
# 使用pipenv(推荐)
pip install pipenv
pipenv install requests
pipenv shell
困难3:代码组织
随着脚本变得复杂,代码组织成为问题。
解决方案:
# 项目结构
project/
├── main.py
├── config/
│ ├── __init__.py
│ └── settings.py
├── utils/
│ ├── __init__.py
│ ├── system.py
│ └── network.py
├── logs/
└── requirements.txt
# 模块化设计
# utils/system.py
def get_cpu_usage():
import psutil
return psutil.cpu_percent()
def get_memory_usage():
import psutil
return psutil.virtual_memory().percent
# main.py
from utils.system import get_cpu_usage, get_memory_usage
from config.settings import THRESHOLDS
def main():
cpu = get_cpu_usage()
memory = get_memory_usage()
if cpu > THRESHOLDS['cpu']:
print(f"CPU使用率过高: {cpu}%")
常用工具和技巧
调试技巧
使用pdb调试器
import pdb
def problematic_function(data):
pdb.set_trace() # 设置断点
result = process_data(data)
return result
# 或者在命令行中运行
# python -m pdb script.py
日志调试
import logging
# 详细的日志配置
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
logger = logging.getLogger(__name__)
def my_function():
logger.debug("函数开始执行")
logger.info("处理数据中...")
logger.warning("发现潜在问题")
logger.error("发生错误")
性能优化
使用生成器节省内存
# 不好的做法 - 占用大量内存
def read_large_file_bad(filename):
with open(filename, 'r') as f:
return f.readlines() # 一次性读取所有行
# 好的做法 - 使用生成器
def read_large_file_good(filename):
with open(filename, 'r') as f:
for line in f:
yield line.strip()
# 使用
for line in read_large_file_good('huge_log.txt'):
if 'ERROR' in line:
print(line)
并发处理
import concurrent.futures
import requests
def check_url(url):
try:
response = requests.get(url, timeout=5)
return f"{url}: {response.status_code}"
except Exception as e:
return f"{url}: Error - {e}"
urls = ['http://example.com', 'http://google.com', 'http://github.com']
# 并发检查多个URL
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(check_url, urls))
for result in results:
print(result)
实用工具函数
import functools
import time
from pathlib import Path
# 计时装饰器
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 执行时间: {end - start:.2f}秒")
return result
return wrapper
# 重试装饰器
def retry(max_attempts=3, delay=1):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
print(f"第{attempt + 1}次尝试失败: {e}")
time.sleep(delay)
return wrapper
return decorator
# 文件备份函数
def backup_file(file_path):
path = Path(file_path)
if path.exists():
backup_path = path.with_suffix(f'.backup.{int(time.time())}')
path.rename(backup_path)
print(f"文件已备份到: {backup_path}")
return backup_path
return None
# 配置文件读取
def load_config(config_file, default_config=None):
import json
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
if default_config:
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
raise
学习心得
1. 从实际需求出发
每个脚本都应该解决具体的问题,不要为了学习而学习。
2. 重视代码质量
- 使用有意义的变量名
- 添加适当的注释
- 遵循PEP 8编码规范
- 编写单元测试
3. 持续重构优化
定期回顾和改进代码,提高可读性和可维护性。
4. 学会使用工具
- IDE: PyCharm, VS Code
- 调试: pdb, logging
- 测试: pytest, unittest
- 代码质量: flake8, black, mypy
5. 关注安全性
# 避免SQL注入
import sqlite3
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
# 避免命令注入
import shlex
safe_command = shlex.quote(user_input)
# 处理敏感信息
import os
password = os.environ.get('DB_PASSWORD') # 从环境变量读取
当前水平
经过几个月的学习,我现在能够:
- ✅ 熟练使用Python基础语法和标准库
- ✅ 编写复杂的自动化脚本解决实际问题
- ✅ 进行错误处理和日志记录
- ✅ 使用第三方库扩展功能
- ✅ 编写可维护和可扩展的代码
下一步计划
- 深入学习Python高级特性(装饰器、元类、异步编程)
- 学习Web框架开发(Flask、Django)
- 掌握数据库操作和ORM
- 了解机器学习和数据科学应用
- 参与开源项目贡献代码
总结
Python自动化脚本学习是一个循序渐进的过程,需要大量的实践和积累。从最初的语法学习到现在能够编写复杂的自动化工具,这个过程让我深刻体会到了Python的强大和灵活。
最重要的是要保持学习的热情,遇到问题时多思考、多查资料、多实践。每解决一个实际问题,都会让你对Python有更深的理解。
记住:Python不仅仅是一门编程语言,更是一种解决问题的思维方式和工具。
目录大纲
下列每一节都提供了完整的小节内容,方便从侧栏快速定位:
一、学习背景
- 为什么学习Python自动化
- 学习目标和规划
- 开发环境搭建
二、学习路径
- Python基础语法(变量、控制流、函数)
- 标准库应用(系统操作、文件处理、网络编程)
- 数据处理技能(JSON、CSV、正则表达式)
三、实际项目
- 系统监控脚本(资源监控与告警)
- 日志分析工具(Nginx日志分析)
- 服务器批量管理(SSH批量操作)
四、困难与解决
- 异常处理机制
- 第三方库管理
- 代码组织结构
五、工具技巧
- 调试技巧(pdb、logging)
- 性能优化(生成器、并发)
- 实用工具函数
六、心得与规划
- 学习方法总结
- 当前技能水平评估
- 未来学习路线规划