projects:aerosol_code
Main code
This is in python, you can find a detailed guide on installing python on the NGW100 here. The reed_solomon library can be found on the wiki here
#!/media/mmcblk0p1/install/bin/python ##!/usr/bin/python import reed_solomon import serial import math import time import os TRUE=1 FALSE=0 DEG2RAD=math.pi/180.0 class updatest: pass mynumber="44<really think I'd be that silly?>" default_target_pressure=40 limit=[200,10000,20000] #our altitude ranges for the different samples max_flight_time=7000 callsign="AOPP" logname="flightlog.txt" Roottwotwomgovercda = (2*9.81*2/0.72)**0.5 #work this out for our payload mass, cda from http://members.aol.com/nakkarocketry/paratest.html iterations=0 Jxpoints=[] Jypoints=[] updatestuff=updatest() def gps_status(): print 'Retrieving GPS status' gpspos_parts=[''] while not gpspos_parts[0]=='$GPGGA': gpspos=ser_gr.readline() print "got" , gpspos gpspos_parts = gpspos.split(',') if ((gpspos_parts[6]=='1') or (gpspos_parts[6]=='2') or (gpspos_parts[6]=='3')): #2D or 3D fix/DGPS fix print 'GPS fix' , gpspos_parts[6] , 'ie valid' status=TRUE else: print 'GPS fix' , gpspos_parts[6] , 'ie not valid' status=FALSE return status def gps_data(): ser_gr.flushInput() #as gps strings will have built up over time print 'Retrieving GPS data' gpspos_parts=[''] while not gpspos_parts[0]=='$GPGGA': gpspos=ser_gr.readline() gpspos_parts = gpspos.split(',') gpstime=3600*float(gpspos_parts[1][0:2])+60*float(gpspos_parts[1][2:4])+float(gpspos_parts[1][4:]) latitude=float(gpspos_parts[2][0:2])+float(gpspos_parts[2][2:])/60.0 if gpspos_parts[3]=="S": latitude=-latitude longitude=float(gpspos_parts[4][0:3])+float(gpspos_parts[4][3:])/60.0 if gpspos_parts[5]=="W": longitude=-longitude if gpspos_parts[9]=='': gpspos_parts[9]='0.0' altitude=float(gpspos_parts[9]) #print gpstime,latitude,longitude,altitude return gpstime,latitude,longitude,altitude def db_stats(): ser_daughter.flushInput() #we done want to mess things up with old data in the buffer ser_daughter.write("V1\r\n") #the high voltage values V1=ser_daughter.readline() ser_daughter.write("V2\r\n") V2=ser_daughter.readline() ser_daughter.write("V3\r\n") V3=ser_daughter.readline() ser_daughter.write("I\r\n") #the total ioniser current IC=ser_daughter.readline() #ser_daughter.write("B\r\n") #battery voltage #VB=ser_daughter.readline() ser_daughter.write("D\r\n") #pressure difference PD=ser_daughter.readline() ser_daughter.write("T\r\n") #new - temperature sensor, to be fitted TS=ser_daughter.readline() return V1[:len(V1)-2],V2[:len(V2)-2],V3[:len(V3)-2],IC[:len(IC)-2],PD[:len(PD)-2],TS[:len(TS)-2] def HV_enable(n): stringy='H'+str(n)+"\r\n" ser_daughter.write(stringy) print ser_daughter.readline() def Set_pressure_target(P): ser_daughter.write('P'+str(P)+"\r\n") def air_density(alt): # NASA air temperature/pressure/density model if (alt < 11000.0): # below 11Km - Troposphere temp = 15.04 - (0.00649 * alt) pres = 101.29 *(((temp + 273.1) / 288.08)**5.256) else: if (alt < 25000.0): # between 11Km and 25Km - lower Stratosphere temp = -56.46 pres = 22.65 * math.exp(1.73 - ( 0.000157 * alt)) else: # above 25Km - upper Stratosphere temp = -131.21 + (0.00299 * alt) pres = 2.488 *(((temp + 273.1) / 216.6)**-11.388) return(pres / (0.2869 * (temp + 273.1))) def Updatepredict(Seconds,North,East,Altitude): Deltae=East-updatestuff.Oldeast Deltan=North-updatestuff.Oldnorth Deltaa=Altitude-updatestuff.Oldaltitude Deltat=Seconds-updatestuff.Oldseconds Averagealt=(updatestuff.Oldaltitude+Altitude)/2 if Deltaa>0: K=Roottwotwomgovercda*(air_density(Averagealt)**-0.5) #we now have the velocity of decent print "Descent velocity=",K K=Deltaa/K #time of decent for this layer print "descent time=",K K=K/Deltat #weighting for this layer print "Layer weighting=",K updatestuff.Predictedn+=(K+1)*Deltan #need to account for ascent drift updatestuff.Predictede+=Deltae*( (K*math.cos(((North + updatestuff.Oldnorth)/2)*DEG2RAD)/math.cos(updatestuff.Predictedn*DEG2RAD)) +1) updatestuff.Oldeast=East updatestuff.Oldnorth=North updatestuff.Oldaltitude=Altitude updatestuff.Oldseconds=Seconds print "predicted north=",updatestuff.Predictedn print "predicted east=",updatestuff.Predictede return updatestuff.Predictedn,updatestuff.Predictede def are_we_inside(gpstime,Xpos,Ypos,altitude): count=False print "checking",Xpos,Ypos for g in range(len(Jxpoints)): if g==len(Jxpoints)-1: gplus=0 else: gplus=g+1 if ((Jypoints[g]>Ypos and Jypoints[gplus]<Ypos) or (Jypoints[g]<Ypos and Jypoints[gplus]>Ypos))and(((Jxpoints[g]-Xpos)+((Ypos-Jypoints[g])*(Jxpoints[gplus]-Jxpoints[g])/(Jypoints[gplus]-Jypoints[g])))>0): count=not count print "intercept",g print 'we are inside=',count return count def load_kml(filepath): xpoints=[] ypoints=[] found=False for line in open(filepath,'r').readlines(): if not line.find("<coordinates>")==-1: found=True if not line.find("</coordinates>")==-1: found=False if found: line_split=line.split(',') if len(line_split)==3: xpoints+=[float(line_split[0])] ypoints+=[float(line_split[1])] return xpoints,ypoints def send_sms(s,target): target=str(target) if not len(target)==12: print "phone number incorrect lenght" return -1 if len(s)>160: print "text is too long" return -1 E=0 D=1 stringy="" t="" Bitstring=[0] for G in s: Numb=ord(G) for M in range(7): if (Numb >> M) & 1: Bitstring[E]+=D D*=2 if D==256: D=1 E=E+1 Bitstring.append(0) print "SMS" stringy="AT+CMGS="+str(14+(int(len(s)*7/8)+1))+"\r\n" #total lenght #print stringy ser_phone.write(stringy) print ser_phone.readline() #echo the command ser_phone.readline() print ser_phone.readline() # the "> " is recieved stringy="0011000C91" # SMS submit, 12 digit international for g in range(6): stringy+=(target[2*g+1]) stringy+=(target[2*g]) stringy+="0000AA" #PDU string to Mobile, 4 day validity stringy+="%.2X" % len(s) #datalenght for G in Bitstring: stringy+="%.2X" % G #a load of HEX stringy+=chr(26) #send it ser_phone.write(stringy) for n in range(4): print ser_phone.readline() # print stringy return 0 def get_smscen(): ser_phone.write("AT+CSCA?\r\n") ser_phone.readline() s=ser_phone.readline() s=s.split('"') return str(s[1:2])[3:15] def cutdown(): ser_gr.write("Cutdown... ") shutdown() os.system("echo 0 > /config/gpio/cuttertwo/enabled") #payload release time.sleep(6) os.system("echo 1 > /config/gpio/cuttertwo/enabled") #and we are on the way down print "released" def shutdown(): HV_enable(0) #all off Set_pressure_target(0) #pump off time.sleep(1) #wait, to avoid smoke contamination os.system("echo 0 > /config/gpio/cutterone/enabled") #cut plunger time.sleep(3) os.system("echo 1 > /config/gpio/cutterone/enabled") print "plunger cut" ser_gr.write("Shutdown\r\n") try: print 'AOPP aerosol experiment running' #log=open("/media/mmcblk0p1/"+logname,"a+") log=open("./"+logname,"a+") ser_gr=serial.Serial('/dev/ttyS2',4800, timeout=2, rtscts=1) #we have a CTS line from the radio #ser_gr=open("radio.txt","r") ser_daughter=serial.Serial('/dev/ttyS1',19200, timeout=2) #ser_daughter=open("daughter.txt","a+") ser_phone=serial.Serial('/dev/ttyS0',9600,timeout=4) #ser_phone=open("phone.txt","a+") print 'serial is open to gps,radio, and daughterboard' ser_gr.write("Hello world") #log.write("Hello world") reed_solomon.setup_rs() ser_gr.write(reed_solomon.encode_string("Hello, I am a reed solomon encoded string :P")) #log.write(reed_solomon.encode_string("Hello, I am a reed solomon encoded string :P")) # while not gps_status(): # ser_gr.write("waiting for the gps to lock") #log.write("waiting for the gps to lock") print 'ok, gps is ready, we are at:' ser_gr.write("GPS locked") #log.write("GPS locked") print gps_data() print 'now probing daughterboard' print db_stats() print 'DANGER: turning on HV1' HV_enable(1) print 'HV1 on, probing board' time.sleep(1) print db_stats() print 'testing other HV channels' for x in [2,3]: HV_enable(x) time.sleep(1) print 'HV channel' + str(x) + 'stats:' + str(db_stats()) HV_enable(0) print 'ok, now testing the pump @ 20%' Set_pressure_target(20) for i in range(20): print db_stats() time.sleep(1) Set_pressure_target(0) print 'pressure set to zero' for i in range(6): print db_stats() time.sleep(1) print 'Testing done' print 'opening KML cutdown file' Jxpoints,Jypoints=load_kml('./cutdown.kml') print Jxpoints,Jypoints print 'testing phone' ser_phone.write("AT\r\n") print ser_phone.readline() print ser_phone.readline() #smscen=get_smscen() send_sms("hello world",mynumber) ser_daughter.flushInput() ser_phone.flushInput() system_vector=gps_data() updatestuff.Predictedn=system_vector[1] updatestuff.Predictede=system_vector[2] updatestuff.Oldeast=system_vector[2] #start updatestuff.Oldnorth=system_vector[1] updatestuff.Oldaltitude=system_vector[3] updatestuff.Oldseconds=system_vector[0] count=0 calibrate=0 layercounter=0 cut_down='' system_vector=gps_data() maxaltitude=system_vector[3] startuptime=system_vector[0] HV=0 pump=FALSE print 'ok launch the balloon' while 1: print 'in loop' system_vector=gps_data() daughter_board=db_stats() datastring = ','.join(['%.0f' % system_vector[0]]+['%.6f' % system_vector[1]]+['%.6f' % system_vector[2]]+['%.0f' % system_vector[3]]+['%.4f' % updatestuff.Predictedn]+['%.4f' % updatestuff.Predictede]) datastring+=str(daughter_board)+cut_down print datastring log.write(datastring+"\r\n") ser_gr.write(reed_solomon.encode_string(callsign+datastring)) #all our telemetery over the radio link #log.write(reed_solomon.encode_string(callsign+datastring)) #t=reed_solomon.encode_string(callsign+datastring) #print t #print reed_solomon.decode_string(t,[]) if count==2: send_sms(datastring,mynumber) #every third time count=0 os.system("sync") count+=1 if calibrate==30: ser_daughter.write("K\r\n") #recalibrates the pressure sensor calibrate=0 calibrate+=1 if cut_down=='': if system_vector[3]/100>layercounter: #100m altitude layers layercounter+=1 #we move up a layer up=Updatepredict(*system_vector) log.write(str(up)+"\r\n") if not are_we_inside([0],up[1],up[0],[0]): #update and check against the polygon print "geofence cutdown" log.write("geofence cutdown\r\n") cutdown() cut_down='CG' if system_vector[0]-startuptime>max_flight_time: print "time cutdown" log.write("time cutdown\r\n") cutdown() cut_down='CT' #if daughter_board[4]<battery_limit: # print "voltage cutdown" # log.write("voltage cutdown\r\n") # cutdown() # cut_down='CB' if system_vector[3]-maxaltitude<-100: print "balloon popped" log.write("balloon popped\r\n") shutdown() cut_down='BP' if system_vector[3]>maxaltitude: maxaltitude=system_vector[3] if system_vector[3]>limit[0] and pump==FALSE: print 'turning on the pump (setting pressure target)' HV_enable(1) Set_pressure_target(default_target_pressure) log.write("HV1 on\r\n") pump=TRUE HV=1 if system_vector[3]>limit[1] and HV==1: #set the right HV channel for the altitude HV_enable(2) log.write("HV2 on\r\n") HV=2 if system_vector[3]>limit[2] and HV==2: HV_enable(3) log.write("HV3 on\r\n") HV=3 time.sleep(10) iterations+=1 #10 seconds sleep finally: log.close() # ser_phone.write("AT+CFUN=0\r\n") #phone off ser_phone.close() ser_gr.close() ser_daughter.close() print "bye"
projects/aerosol_code.txt · Last modified: 2008/07/19 23:33 by 127.0.0.1