日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

使用python sdk添加刪除阿里云pvc路由

作者:lv2 更新時(shí)間: 2023-07-07 編程語(yǔ)言

1. 前言

由于線路供應(yīng)商sdwan存在單點(diǎn)問(wèn)題,需要實(shí)現(xiàn)線路高可用解決方案,需要設(shè)計(jì)自動(dòng)切換阿里云vpc路由解決方案。通過(guò)阿里云文檔了解,可通過(guò)阿里云專有網(wǎng)絡(luò)Python SDK,通過(guò)sdk實(shí)現(xiàn)創(chuàng)建、刪除、查詢等vpc網(wǎng)絡(luò)相關(guān)操作(創(chuàng)建/刪除路由表、創(chuàng)建/刪除自定義路由等)。

環(huán)境準(zhǔn)備

  • Python 2.7及3.x環(huán)境(這里使用python3.9.6)
  • 配置阿里云賬號(hào)和訪問(wèn)密鑰(AccessKey),權(quán)限:AliyunVPCFullAccess
  • 安裝Alibaba Cloud SDK for Python

2. 安裝

安裝Alibaba Cloud SDK

# 安裝模塊
pip install alibabacloud_tea_openapi
pip install alibabacloud_vpc20160428==2.0.0
pip install aliyun-python-sdk-vpc==3.0.25

# 安裝sdk示例庫(kù),完成環(huán)境初始化
git clone https://github.com/aliyun/aliyun-openapi-python-sdk-examples.git
cd aliyun-openapi-python-sdk-examples/sdk_examples/
python setup.py install

安裝效果如下

3. 使用方法?

在解壓的目錄.sdk_examples/examples/,可以看到除了vpc的py腳本,還有slb,eip等,根據(jù)需要可自行查看修改,這里使用sdk_examples/examples/vpc/vpc_route_entry.py進(jìn)行修改,原代碼為

#encoding=utf-8
import sys
import json
import time

from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
from aliyunsdkvpc.request.v20160428 import CreateRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DeleteRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DescribeRouteTablesRequest
from sdk_lib.exception import ExceptionHandler
from sdk_lib.check_status import CheckStatus
from sdk_lib.common_util import CommonUtil
from sdk_lib.sdk_vswitch import VSwitch
from sdk_lib.sdk_route_table import RouteTable
from sdk_lib.consts import *

class RouteEntry(object):
    def __init__(self, client):
        self.client = client

    def create_route_entry(self, params):
        """
        create_route_entry: 創(chuàng)建route_entry路由條目
        官網(wǎng)API參考鏈接: https://help.aliyun.com/document_detail/36012.html
        """
        try:
            request = CreateRouteEntryRequest.CreateRouteEntryRequest()
            # 路由表ID
            request.set_RouteTableId(params['route_table_id'])
            # 自定義路由條目的目標(biāo)網(wǎng)段
            request.set_DestinationCidrBlock(params['destination_cidr_block'])
            # 下一跳的類型
            request.set_NextHopType(params['nexthop_type'])
            # 下一跳實(shí)例的ID
            request.set_NextHopId(params['nexthop_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            # 判斷router entry狀態(tài)是否可用
            if CheckStatus.check_status(TIME_DEFAULT_OUT, DEFAULT_TIME,
                                        self.describe_route_entry_status,
                                        AVAILABLE, params['route_table_id']):
                return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def delete_route_entry(self, params):
        """
        delete_route_entry: 刪除route_entry路由條目
        官網(wǎng)API參考鏈接: https://help.aliyun.com/document_detail/36013.html
        """
        try:
            request = DeleteRouteEntryRequest.DeleteRouteEntryRequest()
            # 路由條目所在的路由表的ID
            request.set_RouteTableId(params['route_table_id'])
            # 路由條目的目標(biāo)網(wǎng)段
            request.set_DestinationCidrBlock(params['destination_cidr_block'])
            # 下一跳實(shí)例的ID
            request.set_NextHopId(params['nexthop_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            time.sleep(DEFAULT_TIME)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def describe_route_entry_vrouter(self, params):
        """
        describe_route_entry_vrouter: 查詢r(jià)oute_entry路由條目
        官網(wǎng)API參考鏈接: https://help.aliyun.com/document_detail/36014.html
        """
        try:
            request = DescribeRouteTablesRequest.DescribeRouteTablesRequest()
            # 路由表所屬的VPC路由器或邊界路由器的ID
            request.set_VRouterId(params['vrouter_id'])
            # 路由表的ID
            request.set_RouteTableId(params['route_table_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def describe_route_entry(self, route_table_id):
        """
        describe_route_entry: 查詢r(jià)oute_entry路由條目
        官網(wǎng)API參考鏈接: https://help.aliyun.com/document_detail/36014.html
        """
        try:
            request = DescribeRouteTablesRequest.DescribeRouteTablesRequest()
            request.set_RouteTableId(route_table_id)
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def describe_route_entry_status(self, route_table_id):
        """
        describe_route_entry_status: 查詢r(jià)oute_entry路由條目當(dāng)前狀態(tài)
        """
        response = self.describe_route_entry(route_table_id)
        return response["RouteTables"]["RouteTable"][0]["RouteEntrys"]["RouteEntry"][0]["Status"]

def main():
    client = ACS_CLIENT
    vswitch = VSwitch(client)
    route_table = RouteTable(client)
    route_entry = RouteEntry(client)

    params = {}
    params['route_table_name'] = "sdk_route_table"
    params['destination_cidr_block'] = "0.0.0.0/0"
    params['nexthop_id'] = "i-xxx"
    params['nexthop_type'] = "Instance"

    params['vpc_id'] = "vpc-xxx"
    params['vswitch_id'] = "vsw-xxx"

    #創(chuàng)建route table
    route_table_json = route_table.create_route_table(params)
    CommonUtil.log("create_route_table", route_table_json)

    #查詢vswitch
    vswitch_json = vswitch.describe_vswitch_attribute(params)
    CommonUtil.log("describe_vswitch_attribute", vswitch_json)

    #route table綁定vswitch
    params['route_table_id'] = route_table_json['RouteTableId']
    associate_json = route_table.associate_route_table(params)
    CommonUtil.log("associate_route_table", associate_json)

    #創(chuàng)建路由條目
    create_route_entry_json = route_entry.create_route_entry(params)
    CommonUtil.log("create_route_entry", create_route_entry_json)
    
    #刪除路由條目
    delete_route_entry_json = route_entry.delete_route_entry(params)
    CommonUtil.log("delete_route_entry", delete_route_entry_json)

    #route table解綁vswitch
    unassociate_json = route_table.unassociate_route_table(params)
    CommonUtil.log("unassociate_route_table", unassociate_json)

    #刪除route table
    delete_route_table_json = route_table.delete_route_table(params)
    CommonUtil.log("delete_route_table", delete_route_table_json)


if __name__ == "__main__":
    sys.exit(main())

改為兩個(gè)py腳本,一個(gè)添加路由,一個(gè)刪除路由

添加路由腳本:add_route.py

#encoding=utf-8
import sys
import json
import time

from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
from aliyunsdkvpc.request.v20160428 import CreateRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DeleteRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DescribeRouteTablesRequest
from sdk_lib.exception import ExceptionHandler
from sdk_lib.check_status import CheckStatus
from sdk_lib.common_util import CommonUtil
from sdk_lib.sdk_vswitch import VSwitch
from sdk_lib.sdk_route_table import RouteTable
from sdk_lib.consts import *

class RouteEntry(object):
    def __init__(self, client):
        self.client = client
    
    def create_route_entry(self, params):
        """
        create_route_entry: 創(chuàng)建route_entry路由條目

        """
        try:
            request = CreateRouteEntryRequest.CreateRouteEntryRequest()
            # 路由表ID
            request.set_RouteTableId(params['route_table_id'])
            # 自定義路由條目的目標(biāo)網(wǎng)段
            request.set_DestinationCidrBlock(params['destination_cidr_block'])
            # 下一跳的類型
            request.set_NextHopType(params['nexthop_type'])
            # 下一跳實(shí)例的ID
            request.set_NextHopId(params['nexthop_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            # 判斷router entry狀態(tài)是否可用
            if CheckStatus.check_status(TIME_DEFAULT_OUT, DEFAULT_TIME,
                                        self.describe_route_entry_status,
                                        AVAILABLE, params['route_table_id']):
                return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def describe_route_entry_status(self, route_table_id):
        """
        describe_route_entry_status: 查詢r(jià)oute_entry路由條目當(dāng)前狀態(tài)

        """
        response = self.describe_route_entry(route_table_id)
        return response["RouteTables"]["RouteTable"][0]["RouteEntrys"]["RouteEntry"][0]["Status"]

    def describe_route_entry_vrouter(self, params):
        """
        describe_route_entry_vrouter: 查詢r(jià)oute_entry路由條目

        """
        try:
            request = DescribeRouteTablesRequest.DescribeRouteTablesRequest()
            # 路由表所屬的VPC路由器或邊界路由器的ID
            request.set_VRouterId(params['vrouter_id'])
            # 路由表的ID
            request.set_RouteTableId(params['route_table_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

    def describe_route_entry(self, route_table_id):
        """
        describe_route_entry: 查詢r(jià)oute_entry路由條目

        """
        try:
            request = DescribeRouteTablesRequest.DescribeRouteTablesRequest()
            request.set_RouteTableId(route_table_id)
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

def main():
    # 這里填上上面創(chuàng)建的阿里云帳號(hào)accesskey和secretkey,以及vpc地域
    client = AcsClient(
            '***',
            '***',
            'cn-guangzhou')
    vswitch = VSwitch(client)
    route_table = RouteTable(client)
    route_entry = RouteEntry(client)
    # 變量params參數(shù),根據(jù)需要添加路由,pvc id,switch id和路由表名稱
    params = {}
    params['route_table_name'] = "sdk_route_table"
    params['destination_cidr_block'] = "0.0.0.0/0"
    params['nexthop_id'] = "i-xxx"
    params['nexthop_type'] = "Instance"

    params['vpc_id'] = "vpc-xxx"
    params['vswitch_id'] = "vsw-xxx"


    #創(chuàng)建路由條目
    create_route_entry_json = route_entry.create_route_entry(params)
    CommonUtil.log("create_route_entry", create_route_entry_json)

if __name__ == "__main__":
    sys.exit(main())

使用方法:

python add_route.py

刪除路由腳本:del_route.py

#encoding=utf-8
import sys
import json
import time

from aliyunsdkcore.acs_exception.exceptions import ServerException, ClientException
from aliyunsdkvpc.request.v20160428 import CreateRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DeleteRouteEntryRequest
from aliyunsdkvpc.request.v20160428 import DescribeRouteTablesRequest
from sdk_lib.exception import ExceptionHandler
from sdk_lib.check_status import CheckStatus
from sdk_lib.common_util import CommonUtil
from sdk_lib.sdk_vswitch import VSwitch
from sdk_lib.sdk_route_table import RouteTable
from sdk_lib.consts import *

class RouteEntry(object):
    def __init__(self, client):
        self.client = client
    
    def delete_route_entry(self, params):
        """
        delete_route_entry: 刪除route_entry路由條目

        """
        try:
            request = DeleteRouteEntryRequest.DeleteRouteEntryRequest()
            # 路由條目所在的路由表的ID
            request.set_RouteTableId(params['route_table_id'])
            # 路由條目的目標(biāo)網(wǎng)段
            request.set_DestinationCidrBlock(params['destination_cidr_block'])
            # 下一跳實(shí)例的ID
            request.set_NextHopId(params['nexthop_id'])
            response = self.client.do_action_with_exception(request)
            response_json = json.loads(response)
            time.sleep(DEFAULT_TIME)
            return response_json
        except ServerException as e:
            ExceptionHandler.server_exception(e)
        except ClientException as e:
            ExceptionHandler.client_exception(e)

 

def main():
    # 這里填上上面創(chuàng)建的阿里云帳號(hào)accesskey和secretkey,以及vpc地域
    client = AcsClient(
            '***',
            '***',
            'cn-guangzhou')
    vswitch = VSwitch(client)
    route_table = RouteTable(client)
    route_entry = RouteEntry(client)
    # 變量params參數(shù),根據(jù)需要添加路由,pvc id,switch id和路由表名稱
    params = {}
    params['route_table_name'] = "sdk_route_table"
    params['destination_cidr_block'] = "0.0.0.0/0"
    params['nexthop_id'] = "i-xxx"
    params['nexthop_type'] = "Instance"

    params['vpc_id'] = "vpc-xxx"
    params['vswitch_id'] = "vsw-xxx"


    #刪除路由條目
    delete_route_entry_json = route_entry.delete_route_entry(params)
    CommonUtil.log("delete_route_entry", delete_route_entry_json)

if __name__ == "__main__":
    sys.exit(main())

使用方法:

python del_route.py

原文鏈接:https://blog.csdn.net/manwufeilong/article/details/131477339

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新