-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathAutomation.py
231 lines (199 loc) · 5.8 KB
/
Automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#! /usr/bin/python2.7
#importing all the modules
import os
import i2c
import spi
import sys
import temp
import mail
import signal
import time
import picamera as cam
import RPi.GPIO as GPIO
from pubnub import Pubnub
from twilio.rest import Client
from multiprocessing import Process
#setting channel names for pubnub
channel1="Txt-data"
channel2="Rcv-data"
#Assigning Pin Numbers
BULB = 23
TAP=24
FAN = 16
MOTOR =20
PIR=21
WARNING=12
#Twilio initialization
client=Client("Axxxxxxxxxxxxxxxxxxxxxxa","exxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxb")
def init():
#uses Bradcom pin numbering scheme
GPIO.setmode (GPIO.BCM)
#remove all runtime warings in gpio
GPIO.setwarnings(False)
GPIO.setup(BULB,GPIO.OUT) #setting up bulb pin
GPIO.setup(FAN,GPIO.OUT) #setting up fan pin
GPIO.setup(TAP,GPIO.OUT) #setting up Tap pin
GPIO.setup(MOTOR,GPIO.OUT)#setting up motor pin
GPIO.setup(PIR,GPIO.IN)#setting up PIR pin
GPIO.setup(WARNING,GPIO.OUT)#setting up PIR pin
#connecting in active low so turn off
GPIO.output(BULB,True)
GPIO.output(FAN,True)
GPIO.output(MOTOR,True)
GPIO.output(TAP,True)
GPIO.output(WARNING,True)
#change default action of SIGCHLD
signal.signal(signal.SIGCHLD,handler1)
pubnub = Pubnub(publish_key='pub-cxxxxxxxxxxxxxxxxxxxxxxxxxxxxx3', subscribe_key='sub-cxxxxxxxxxxxxxxxxxxxxxxxxxx1')
#initialize adc
spi.adc_setup()
return pubnub
def callback(m): #publishing status
print(m)
def __callback__(m, channel): #subscribing status
if 'bulb' in m :
if m['bulb'] == 1:
GPIO.output(BULB,False)
print "bulb on"
else:
GPIO.output(BULB,True)
print "bulb off"
if 'fan' in m:
if m['fan'] == 1:
GPIO.output(FAN,False)
print "fan on"
else:
GPIO.output(FAN,True)
print "fan off"
if 'motor' in m:
if m['motor'] == 1:
GPIO.output(MOTOR,False)
print "motor on"
else:
GPIO.output(MOTOR,True)
print "motor off"
def __error__(m): #error while subscribing
print(m)
def handler1(signum,frame): #Action performed for SIGCHLD signal
try:
os.wait()
print "child exited"
except OSError:
pass
def main():
pubnub=init()
pubnub.subscribe(channels=channel2, callback=__callback__, error=__error__)
# Automation using LDR to turn on and turn off light
def ldr():
bulb_flg=0
while(1):
time.sleep(1) #not needed given only for checking value
#reading RTC value
t=i2c.i2c_time()
os.system("clear")
print "Time : ",t #not needed given for testing code
#reading LDR value
LDR=spi.adc_read(0)
print "LDR : ",LDR #not needed given for testing code
if LDR>3600 and bulb_flg==0:
time.sleep(3)
LDR=spi.adc_read(0)
if LDR>3600:
data = {
'BULB':'ON'
}
pubnub.publish(channel1, data, callback=callback, error=callback)
GPIO.output(BULB,False)
message=client.messages.create(
to="+919446047003",
from_="+14xxxxxxxx2",
body="Light is turning on at "+t)
print message.sid
bulb_flg=1
elif LDR<=3600 and bulb_flg==1:
GPIO.output(BULB,True)
bulb_flg=0
# Automation using IR to turn on and off Tap
def ir():
while(1):
#reading RTC value
time.sleep(1) #not needed given only for checking value
t=i2c.i2c_time()
os.system("clear")
print "Time : ",t #not needed given for testing code
#reading IR value
IR=spi.adc_read(1)
print "IR : ",IR #not needed given for testing code
if IR<3000 :
GPIO.output(TAP,False)
time.sleep(3)
GPIO.output(TAP,True)
# Automation using DS18b0 to turn on and turn off ac/fan
def temper():
fan_flg=0
while(1):
time.sleep(1) #not needed given only for checking value
#reading RTC value
t=i2c.i2c_time()
os.system("clear")
print "Time : ",t #not needed given for testing code
#reading temperature value
tem=temp.read_temp()
print "Temperature : ",tem," c" #not needed given for testing code
if tem>29.0 and fan_flg==0:
time.sleep(3)
tem=temp.read_temp()
if tem>29.0:
data = {
'FAN':'ON'
}
pubnub.publish(channel1, data, callback=callback, error=callback)
GPIO.output(FAN,False)
message=client.messages.create(
to="+919446047003",
from_="+14xxxxxxx2",
body="AC is turning on at "+t+"\ntemperature : "+str(tem) )
print message.sid
fan_flg=1
elif tem <=29.0 and fan_flg==1:
GPIO.output(FAN,True)
fan_flg=0
#Theft detection using PIR
def pir():
while(1):
#reading RTC value
t=i2c.i2c_time()
os.system("clear")
print "Time : ",t #not needed given for testing code
if GPIO.input(PIR):
print "Security alert"
data = {
'Security':'Alert'
}
pubnub.publish(channel1, data, callback=callback, error=callback)
c=cam.PiCamera()
c.capture("img.jpg")
c.close()
GPIO.output(WARNING,False)
message=client.messages.create(
to="+919446047003",
from_="+14xxxxxxxx2",
body="Some one at Home \n\nTime: "+t )
s=mail.my_mail("gmail username","password")
s.send_mail("[email protected]","Attention:Security Issue","Some one in your home","./img.jpg")
time.sleep(10)
GPIO.output(WARNING,True)
time.sleep(1) #not needed given only for checking value
#creating subprocesses
p1=Process(target=ldr)
p2=Process(target=ir)
p3=Process(target=temper)
p4=Process(target=pir)
#starting all subprocesses
p1.start()
p2.start()
p3.start()
p4.start()
#boilerplate syntax
if __name__ == "__main__":
main()