使用Python和OpenAPI将云上的安全组规则填写入Excel

暗香疏影 创作者

我们知道使用CLI是可以列出安全组信息,但是仅仅可以列出安全组信息。而使用CLI列出安全组规则需要安全组的ID,不能列出全部安全组规则。所以想要将交付物以表格清单的形式给客户,将会比较麻烦。这里我们使用python来实现。

1
2
3
4
5
# Check SecurityGroup
aliyun --profile CLI-exampleAK1 ecs DescribeSecurityGroups --output cols="VpcId,SecurityGroupId,SecurityGroupName,CreationTime" rows="SecurityGroups.SecurityGroup[]"

# Check SecurityGroup Rule
aliyun--profile CLI-exampleAK1 ecs DescribeSecurityGroupAttribute --SecurityGroupId sg-xxxxxxxxx

利用Python快速实现

首先需要去api.aliyun.com找到OpenAPI在线调试页面,然后找到DescribeSecurityGroupAttribute。
再使用SDK示例找到Python后直接下载Python项目

1
https://api.aliyun.com/api/Ecs/2014-05-26/DescribeSecurityGroupAttribute?params={%22RegionId%22:%22cn-beijing%22}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#################################################
# -*- coding: utf-8 -*-
import sys
import os
import pandas as pd
from typing import List
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_ecs20140526.models import DescribeSecurityGroupAttributeRequest
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions

class Sample:
@staticmethod
def create_client() -> EcsClient:
"""
使用AK&SK初始化账号Client
@return: Client
@throws Exception
"""
config = Config(
access_key_id='xxxxxxxxx',
access_key_secret='xxxxxxxxxxx'
# 这是让你在环境变量填写,而不写入代码里, 我懒,直接写到代码里。
# access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
config.endpoint = 'ecs.ap-southeast-1.aliyuncs.com' # 根据实际区域进行修改
return EcsClient(config)

@staticmethod
def read_security_group_ids_from_excel(file_path, sheet_name='Sheet1'):
"""
从Excel文件中读取安全组ID
@param file_path: Excel文件路径
@param sheet_name: Excel工作表名称
@return: 包含安全组ID的列表
"""
df = pd.read_excel(file_path, sheet_name=sheet_name)
security_group_ids = df['sgid'].tolist() # 假设 'sgid' 是包含安全组ID的列
return security_group_ids

@staticmethod
def extract_info_from_response(response):
"""
从API响应中提取关键信息
"""
permissions_info = []
for permission in response.permissions.permission:
permissions_info.append({
#'SecurityGroupName': permission.security_group_name,
'IpProtocol': permission.ip_protocol,
'PortRange': permission.port_range,
'SourceCidrIp': permission.source_cidr_ip,
'SecurityGroupRuleId': permission.security_group_rule_id
})
return {
'sgid': response.security_group_id,
'SecurityGroupName': response.security_group_name,
'InnerAccessPolicy': response.inner_access_policy,
'Permissions': permissions_info
}

@staticmethod
def write_to_excel(data: List[dict], file_path):
"""
将数据列表写入Excel文件
@param data: 数据列表,其中每个元素是一个字典
@param file_path: Excel文件路径
"""
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)

@staticmethod
def main(args: List[str]) -> None:
client = Sample.create_client()
excel_file_path = 'D:\\Projects\\sg-list-rule\\sg-list.xlsx' # 替换为您的Excel文件路径
security_group_ids = Sample.read_security_group_ids_from_excel(excel_file_path)

processed_data = [] # 用于存储处理后的数据

for sg_id in security_group_ids:
request = DescribeSecurityGroupAttributeRequest(
region_id='ap-southeast-1', # 替换为您的区域ID
security_group_id=sg_id
)
try:
response = client.describe_security_group_attribute(request)
response_body = response.body
sg_info = Sample.extract_info_from_response(response_body)

# 为每条权限规则添加行数据
for permission in sg_info['Permissions']:
processed_data.append({
'sgid': sg_info['sgid'],
'SecurityGroupName': sg_info['SecurityGroupName'],
'InnerAccessPolicy': sg_info['InnerAccessPolicy'],
'IpProtocol': permission['IpProtocol'],
'PortRange': permission['PortRange'],
'SourceCidrIp': permission['SourceCidrIp'],
'SecurityGroupRuleId': permission['SecurityGroupRuleId']
})

except Exception as error:
print(f"Error while processing security group {sg_id}: {error}")

# 准备DataFrame的列
columns = ['sgid', 'SecurityGroupName','InnerAccessPolicy', 'IpProtocol', 'PortRange', 'SourceCidrIp', 'SecurityGroupRuleId']
df = pd.DataFrame(processed_data, columns=columns)

# 写入Excel文件
output_file_path = 'D:\\Projects\\sg-list-rule\\updated_sg_info.xlsx' # 输出文件名
df.to_excel(output_file_path, index=False)
print(f"Data has been successfully written to {output_file_path}")

if __name__ == '__main__':
Sample.main(sys.argv[1:])

源表格: sg-list.xlsx

sgid
sg-11111
sg-22222

目标表格:updated_sg_info.xlsx

sgid SecurityGroupName InnerAccessPolicy IpProtocol PortRange SourceCidrIp SecurityGroupRuleId
sg-11111 sgName1 Accept TCP 3389/3389 0.0.0.0/0 sgr-11111
sg-11111 sgName2 Accept TCP 8080/8083 0.0.0.0/0 sgr-11112
sg-22222 sgName3 Accept UCP 51/51 0.0.0.0/0 sgr-11113

其他老版本python

读取安全组输出到Terminal (第一版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import os
import sys
import pandas as pd

from typing import List

from alibabacloud_ecs20140526.client import Client as Ecs20140526Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecs20140526 import models as ecs_20140526_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
def __init__(self):
pass

@staticmethod
def create_client() -> Ecs20140526Client:
"""
使用AK&SK初始化账号Client
@return: Client
@throws Exception
"""
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
# 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
config = open_api_models.Config(
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint 请参考 https://api.aliyun.com/product/Ecs
config.endpoint = f'ecs.ap-southeast-1.aliyuncs.com'
return Ecs20140526Client(config)



def read_security_group_from_excel(file_path, sheet_name='Sheet1'):
# Read the Excel file into a pandas DataFrame
df = pd.read_excel(file_path, sheet_name=sheet_name, header=0)
security_group_data = df['sgid'].tolist()
return security_group_data
excel_file_path = 'D:\\Projects\\sg-list-rule\\sg-list.xlsx'
#print(security_group_df)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()

security_group_df = Sample.read_security_group_from_excel(Sample.excel_file_path)
security_group_ids = security_group_df
#print(sgid_value)
for sg_id in security_group_ids:
describe_security_group_attribute_request = ecs_20140526_models.DescribeSecurityGroupAttributeRequest(
region_id='ap-southeast-1',
security_group_id=sg_id
)
runtime = util_models.RuntimeOptions()

try:
response = client.describe_security_group_attribute_with_options(
describe_security_group_attribute_request, runtime)

# Print or process the response for the current security group
print(f"Security Group ID: {sg_id}")
print(response) # Replace with your desired way of handling the response
except Exception as error:
print(f"Error while processing security group {sg_id}:")
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
Sample.main(sys.argv[1:])

将安全组规则输出到Terminal (第二版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# -*- coding: utf-8 -*-
import sys
import os
import pandas as pd
from typing import List
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_ecs20140526.models import DescribeSecurityGroupAttributeRequest
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions

class Sample:
@staticmethod
def create_client() -> EcsClient:
"""
使用AK&SK初始化账号Client
@return: Client
@throws Exception
"""
config = Config(
access_key_id='xxxx',
access_key_secret='xxxx'
)
config.endpoint = 'ecs.ap-southeast-1.aliyuncs.com' # 根据实际区域进行修改
return EcsClient(config)

@staticmethod
def read_security_group_ids_from_excel(file_path, sheet_name='Sheet1'):
"""
从Excel文件中读取安全组ID
@param file_path: Excel文件路径
@param sheet_name: Excel工作表名称
@return: 包含安全组ID的列表
"""
df = pd.read_excel(file_path, sheet_name=sheet_name)
security_group_ids = df['sgid'].tolist() # 假设 'sgid' 是包含安全组ID的列
return security_group_ids

@staticmethod
def extract_info_from_response(response):
"""
从API响应中提取关键信息
"""
permissions_info = []
for permission in response.permissions.permission:
permissions_info.append({
'IpProtocol': permission.ip_protocol,
'PortRange': permission.port_range,
'SourceCidrIp': permission.source_cidr_ip,
'SecurityGroupRuleId': permission.security_group_rule_id
})
return {
'sgid': response.security_group_id,
'InnerAccessPolicy': response.inner_access_policy,
'Permissions': permissions_info
}

@staticmethod
def write_to_excel(data: List[dict], file_path):
"""
将数据列表写入Excel文件
@param data: 数据列表,其中每个元素是一个字典
@param file_path: Excel文件路径
"""
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)

@staticmethod
def main(args: List[str]) -> None:
client = Sample.create_client()
excel_file_path = 'D:\\Projects\\sg-list-rule\\sg-list.xlsx' # 替换为您的Excel文件路径
security_group_ids = Sample.read_security_group_ids_from_excel(excel_file_path)

extracted_data = []
for sg_id in security_group_ids:
request = DescribeSecurityGroupAttributeRequest(
region_id='ap-southeast-1', # 替换为您的区域ID
security_group_id=sg_id
)
try:
response = client.describe_security_group_attribute(request)
extracted_info = Sample.extract_info_from_response(response.body)
extracted_data.append(extracted_info)
except Exception as error:
print(f"Error while processing security group {sg_id}: {error}")

output_file_path = 'D:\\Projects\\sg-list-rule\\updated_sg_info.xlsx' # 输出文件名
Sample.write_to_excel(extracted_data, output_file_path)
print(f"Data has been successfully written to {output_file_path}")

if __name__ == '__main__':
Sample.main(sys.argv[1:])

其他使用Python实现的查询

无影云桌面

无影云桌面无法使用CLI, 只能使用SDK实现。以下是将ecd-cewiwx4tv4hy0eba6授权给endusername用户使用示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import os
import sys

from typing import List

from alibabacloud_ecd20200930.client import Client as ecd20200930Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ecd20200930 import models as ecd_20200930_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient


class Sample:
def __init__(self):
pass

@staticmethod
def create_client() -> ecd20200930Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
# 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
config = open_api_models.Config(
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint 请参考 https://api.aliyun.com/product/ecd
config.endpoint = f'ecd.ap-southeast-1.aliyuncs.com'
return ecd20200930Client(config)

@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
modify_user_entitlement_request = ecd_20200930_models.ModifyUserEntitlementRequest(
region_id='ap-southeast-1',
# Array, 必填, 用户ID(即用户名)列表。,
end_user_id=[
'endusername'
],
# Array, 必填, 要新增授权用户的云电脑ID列表。,
authorize_desktop_id=[
'ecd-cewiwx4tv4hy0eba6'
]
)
runtime = util_models.RuntimeOptions()
try:
resp = client.modify_user_entitlement_with_options(modify_user_entitlement_request, runtime)
ConsoleClient.log(UtilClient.to_jsonstring(resp))
except Exception as error:
# 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)

@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
modify_user_entitlement_request = ecd_20200930_models.ModifyUserEntitlementRequest(
region_id='ap-southeast-1',
# Array, 必填, 用户ID(即用户名)列表。,
end_user_id=[
'endusername'
],
# Array, 必填, 要新增授权用户的云电脑ID列表。,
authorize_desktop_id=[
'ecd-cewiwx4tv4hy0eba6'
]
)
runtime = util_models.RuntimeOptions()
try:
resp = await client.modify_user_entitlement_with_options_async(modify_user_entitlement_request, runtime)
ConsoleClient.log(UtilClient.to_jsonstring(resp))
except Exception as error:
# 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)


if __name__ == '__main__':
Sample.main(sys.argv[1:])

  • 标题: 使用Python和OpenAPI将云上的安全组规则填写入Excel
  • 作者: 暗香疏影
  • 创建于 : 2024-04-24 22:23:00
  • 更新于 : 2024-04-24 22:25:54
  • 链接: https://blog.23ikr.com/2024/04/24/2024-04-24-Python-List-SecurityGroupRule/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论