1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
import sys import os import pandas as pd from typing import List from alibabacloud_ecs20140526.client import Client as EcsClient from alibabacloud_ecs20140526.models import DescribeSecurityGroupAttributeRequest from alibabacloud_tea_openapi.models import Config from alibabacloud_tea_util.models import RuntimeOptions
class Sample: @staticmethod def create_client() -> EcsClient: """ 使用AK&SK初始化账号Client @return: Client @throws Exception """ config = Config( access_key_id='xxxxxxxxx', access_key_secret='xxxxxxxxxxx' ) config.endpoint = 'ecs.ap-southeast-1.aliyuncs.com' return EcsClient(config)
@staticmethod def read_security_group_ids_from_excel(file_path, sheet_name='Sheet1'): """ 从Excel文件中读取安全组ID @param file_path: Excel文件路径 @param sheet_name: Excel工作表名称 @return: 包含安全组ID的列表 """ df = pd.read_excel(file_path, sheet_name=sheet_name) security_group_ids = df['sgid'].tolist() return security_group_ids
@staticmethod def extract_info_from_response(response): """ 从API响应中提取关键信息 """ permissions_info = [] for permission in response.permissions.permission: permissions_info.append({ 'IpProtocol': permission.ip_protocol, 'PortRange': permission.port_range, 'SourceCidrIp': permission.source_cidr_ip, 'SecurityGroupRuleId': permission.security_group_rule_id }) return { 'sgid': response.security_group_id, 'SecurityGroupName': response.security_group_name, 'InnerAccessPolicy': response.inner_access_policy, 'Permissions': permissions_info }
@staticmethod def write_to_excel(data: List[dict], file_path): """ 将数据列表写入Excel文件 @param data: 数据列表,其中每个元素是一个字典 @param file_path: Excel文件路径 """ df = pd.DataFrame(data) df.to_excel(file_path, index=False)
@staticmethod def main(args: List[str]) -> None: client = Sample.create_client() excel_file_path = 'D:\\Projects\\sg-list-rule\\sg-list.xlsx' security_group_ids = Sample.read_security_group_ids_from_excel(excel_file_path)
processed_data = []
for sg_id in security_group_ids: request = DescribeSecurityGroupAttributeRequest( region_id='ap-southeast-1', security_group_id=sg_id ) try: response = client.describe_security_group_attribute(request) response_body = response.body sg_info = Sample.extract_info_from_response(response_body)
for permission in sg_info['Permissions']: processed_data.append({ 'sgid': sg_info['sgid'], 'SecurityGroupName': sg_info['SecurityGroupName'], 'InnerAccessPolicy': sg_info['InnerAccessPolicy'], 'IpProtocol': permission['IpProtocol'], 'PortRange': permission['PortRange'], 'SourceCidrIp': permission['SourceCidrIp'], 'SecurityGroupRuleId': permission['SecurityGroupRuleId'] })
except Exception as error: print(f"Error while processing security group {sg_id}: {error}")
columns = ['sgid', 'SecurityGroupName','InnerAccessPolicy', 'IpProtocol', 'PortRange', 'SourceCidrIp', 'SecurityGroupRuleId'] df = pd.DataFrame(processed_data, columns=columns)
output_file_path = 'D:\\Projects\\sg-list-rule\\updated_sg_info.xlsx' df.to_excel(output_file_path, index=False) print(f"Data has been successfully written to {output_file_path}")
if __name__ == '__main__': Sample.main(sys.argv[1:])
|