1+ import boto3
2+ import datetime
3+ import botocore
4+ import json
5+
6+ s3 = boto3 .client ("s3" )
7+ ecs = boto3 .client ("ecs" )
8+ ec2 = boto3 .client ("ec2" )
9+ cloudwatch = boto3 .client ("cloudwatch" )
10+ sqs = boto3 .client ("sqs" )
11+
12+ bucket = "BUCKET_NAME"
13+
14+
15+ def killdeadAlarms (fleetId , monitorapp , project ):
16+ checkdates = [
17+ datetime .datetime .now ().strftime ("%Y-%m-%d" ),
18+ (datetime .datetime .now () - datetime .timedelta (days = 1 )).strftime ("%Y-%m-%d" ),
19+ ]
20+ todel = []
21+ for eachdate in checkdates :
22+ datedead = ec2 .describe_spot_fleet_request_history (
23+ SpotFleetRequestId = fleetId , StartTime = eachdate
24+ )
25+ for eachevent in datedead ["HistoryRecords" ]:
26+ if eachevent ["EventType" ] == "instanceChange" :
27+ if eachevent ["EventInformation" ]["EventSubType" ] == "terminated" :
28+ todel .append (eachevent ["EventInformation" ]["InstanceId" ])
29+ todel = [f"{ project } _{ x } " for x in todel ]
30+ cloudwatch .delete_alarms (AlarmNames = todel )
31+ print ("Old alarms deleted" )
32+
33+
34+ def seeIfLogExportIsDone (logExportId ):
35+ while True :
36+ result = cloudwatch .describe_export_tasks (taskId = logExportId )
37+ if result ["exportTasks" ][0 ]["status" ]["code" ] != "PENDING" :
38+ if result ["exportTasks" ][0 ]["status" ]["code" ] != "RUNNING" :
39+ print (result ["exportTasks" ][0 ]["status" ]["code" ])
40+ break
41+ time .sleep (30 )
42+
43+
44+ def downscaleSpotFleet (queue , spotFleetID ):
45+ response = sqs .get_queue_url (QueueName = queue )
46+ queueUrl = response ["QueueUrl" ]
47+ response = sqs .get_queue_attributes (
48+ QueueUrl = queueUrl ,
49+ AttributeNames = [
50+ "ApproximateNumberOfMessages" ,
51+ "ApproximateNumberOfMessagesNotVisible" ,
52+ ],
53+ )
54+ visible = int (response ["Attributes" ]["ApproximateNumberOfMessages" ])
55+ nonvisible = int (response ["Attributes" ]["ApproximateNumberOfMessagesNotVisible" ])
56+ status = ec2 .describe_spot_fleet_instances (SpotFleetRequestId = spotFleetID )
57+ if nonvisible < len (status ["ActiveInstances" ]):
58+ result = ec2 .modify_spot_fleet_request (
59+ ExcessCapacityTerminationPolicy = "noTermination" ,
60+ TargetCapacity = str (nonvisible ),
61+ SpotFleetRequestId = spotFleetID ,
62+ )
63+
64+
65+ def lambda_handler (event , lambda_context ):
66+ # Triggered any time SQS queue ApproximateNumberOfMessagesVisible = 0
67+ # OR ApproximateNumberOfMessagesNotVisible = 0
68+ messagestring = event ["Records" ][0 ]["Sns" ]["Message" ]
69+ messagedict = json .loads (messagestring )
70+ queueId = messagedict ["Trigger" ]["Dimensions" ][0 ]["value" ]
71+ project = queueId .rsplit ("_" , 1 )[0 ]
72+
73+ # Download monitor file
74+ monitor_file_name = f"{ queueId .split ('Queue' )[0 ]} SpotFleetRequestId.json"
75+ monitor_local_name = f"/tmp/{ monitor_file_name } "
76+ monitor_on_bucket_name = (
77+ f"/monitors/{ monitor_file_name } "
78+ )
79+
80+ with open (monitor_local_name , "wb" ) as f :
81+ try :
82+ s3 .download_fileobj (bucket , monitor_on_bucket_name , f )
83+ except botocore .exceptions .ClientError as error :
84+ print ("Error retrieving monitor file." )
85+ return
86+ with open (monitor_local_name , "r" ) as input :
87+ monitorInfo = json .load (input )
88+
89+ monitorcluster = monitorInfo ["MONITOR_ECS_CLUSTER" ]
90+ monitorapp = monitorInfo ["MONITOR_APP_NAME" ]
91+ fleetId = monitorInfo ["MONITOR_FLEET_ID" ]
92+ loggroupId = monitorInfo ["MONITOR_LOG_GROUP_NAME" ]
93+ starttime = monitorInfo ["MONITOR_START_TIME" ]
94+ CLEAN_DASHBOARD = monitorInfo ["CLEAN_DASHBOARD" ]
95+ print (f"Monitor triggered for { monitorcluster } { monitorapp } { fleetId } { loggroupId } " )
96+
97+ # If no visible messages, downscale machines
98+ if "ApproximateNumberOfMessagesVisible" in event ["Records" ][0 ]["Sns" ]["Message" ]:
99+ print ("No visible messages. Tidying as we go." )
100+ killdeadAlarms (fleetId , monitorapp , project )
101+ downscaleSpotFleet (queueId , fleetId )
102+
103+ # If no messages in progress, cleanup
104+ if "ApproximateNumberOfMessagesNotVisible" in event ["Records" ][0 ]["Sns" ]["Message" ]:
105+ print ("No messages in progress. Cleaning up." )
106+ ecs .update_service (
107+ cluster = monitorcluster , service = f"{ monitorapp } Service" , desiredCount = 0 ,
108+ )
109+ print ("Service has been downscaled" )
110+
111+ # Delete the alarms from active machines and machines that have died.
112+ active_dictionary = ec2 .describe_spot_fleet_instances (
113+ SpotFleetRequestId = fleetId
114+ )
115+ active_instances = []
116+ for instance in active_dictionary ["ActiveInstances" ]:
117+ active_instances .append (instance ["InstanceId" ])
118+ cloudwatch .delete_alarms (AlarmNames = active_instances )
119+ killdeadAlarms (fleetId , monitorapp , project )
120+
121+ # Read spot fleet id and terminate all EC2 instances
122+ ec2 .cancel_spot_fleet_requests (
123+ SpotFleetRequestIds = [fleetId ], TerminateInstances = True
124+ )
125+ print ("Fleet shut down." )
126+
127+ # Remove SQS queue, ECS Task Definition, ECS Service
128+ ECS_TASK_NAME = monitorapp + "Task"
129+ ECS_SERVICE_NAME = monitorapp + "Service"
130+
131+ print ("Deleting existing queue." )
132+ queueoutput = sqs .list_queues (QueueNamePrefix = queueId )
133+ try :
134+ if len (queueoutput ["QueueUrls" ]) == 1 :
135+ queueUrl = queueoutput ["QueueUrls" ][0 ]
136+ else : # In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those
137+ for eachUrl in queueoutput ["QueueUrls" ]:
138+ if eachUrl .split ("/" )[- 1 ] == queueName :
139+ queueUrl = eachUrl
140+ sqs .delete_queue (QueueUrl = queueUrl )
141+ except KeyError :
142+ print ("Can't find queue to delete." )
143+
144+ print ("Deleting service" )
145+ try :
146+ ecs .delete_service (cluster = monitorcluster , service = ECS_SERVICE_NAME )
147+ except :
148+ print ("Couldn't delete service." )
149+
150+ print ("De-registering task" )
151+ taskArns = ecs .list_task_definitions ()
152+ for eachtask in taskArns ["taskDefinitionArns" ]:
153+ fulltaskname = eachtask .split ("/" )[- 1 ]
154+ ecs .deregister_task_definition (taskDefinition = fulltaskname )
155+
156+ print ("Removing cluster if it's not the default and not otherwise in use" )
157+ if monitorcluster != "default" :
158+ result = ecs .describe_clusters (clusters = [monitorcluster ])
159+ if (
160+ sum (
161+ [
162+ result ["clusters" ][0 ]["pendingTasksCount" ],
163+ result ["clusters" ][0 ]["runningTasksCount" ],
164+ result ["clusters" ][0 ]["activeServicesCount" ],
165+ ]
166+ )
167+ == 0
168+ ):
169+ ecs .delete_cluster (cluster = monitorcluster )
170+
171+ # Remove alarms that triggered monitor
172+ print ("Removing alarms that triggered Monitor" )
173+ cloudwatch .delete_alarms (
174+ AlarmNames = [
175+ f"ApproximateNumberOfMessagesVisibleisZero_{ monitorapp } " ,
176+ f"ApproximateNumberOfMessagesNotVisibleisZero_{ monitorapp } " ,
177+ ]
178+ )
179+
180+ # Remove Cloudwatch dashboard if created and cleanup desired
181+ if CLEAN_DASHBOARD .lower ()== 'true' :
182+ dashboard_list = cloudwatch .list_dashboards ()
183+ for entry in dashboard_list ["DashboardEntries" ]:
184+ if monitorapp in entry ["DashboardName" ]:
185+ cloudwatch .delete_dashboards (DashboardNames = [entry ["DashboardName" ]])
0 commit comments