Newer
Older
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
public class Antifurto {
private static String brokerUrl;
private String interruttore;
private String interruttoreOutputSuono;
private static ArrayList<String> topicsSub;
private Date date = new Date();
private String clientId = Long.toString(date.getTime()) + "-sub-pub"; // unique client id
private Automa automa;
private MqttClient mqttClient;
private static String mqttDomain;
private static String mqttSubdomain;
private final String CONF_FILE = "../res/CONF/conf.json";
private final String CONF_ZONA = "../res/CONF/zona.json";
public Antifurto(Automa automa) throws JSONException, IOException, MqttException {
this.automa = automa;
JSONObject jsonObject = new JSONObject(leggiFile(CONF_FILE));
brokerUrl = jsonObject.getString("protocol") + "://" + jsonObject.getString("broker") + ":" + jsonObject.getInt("port");
mqttDomain = jsonObject.getString("mqttDomain");
mqttSubdomain = jsonObject.getString("mqttSubdomain");
// Su questo topic ricevero' un messaggio del tipo {request:description}
// Dovro' quindi mandare la mia descrizione json
topicsSub.add("to/all");
// Su questo topic ricevero' le richieste di inviare il mio stato attuale
topicsSub.add("rpc/"+mqttDomain+"/"+mqttSubdomain+"/antifurto");
topicsSub.add("from/"+mqttDomain+"/"+mqttSubdomain+"/gpio/" + interruttore); // Sottoscrivo i messaggi che notificano il cambiamento di stato dell'interruttore
// Per ogni sensore di movimento, sottoscrivo i messaggi che notificano il loro cambiamento di stato
JSONArray sensori = jsonObject.getJSONArray("sensoriMovimento");
for(int i=0; i<sensori.length(); i++) {
topicsSub.add("from/"+mqttDomain+"/"+mqttSubdomain+"/gpio/" + sensori.get(i));
}
interruttoreOutputSuono = jsonObject.getString("outputSuono");
topicsSub.add("from/"+mqttDomain+"/"+mqttSubdomain+"/gpio/" + interruttoreOutputSuono); // Sottoscrivo i messaggi che notificano il cambiamento di stato dell'interruttore
this.mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
}
public void startClient(Esecutore esec, Publisher publisher) throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
mqttClient.setCallback(new SubscribeCallback(this, publisher, interruttore, interruttoreOutputSuono, esec, automa));
mqttClient.connect(options);
for(String t: topicsSub)
mqttClient.subscribe(t);
}
public void publishMethod(String topic, String msg) throws MqttException {
final MqttTopic msgTopic = mqttClient.getTopic(topic);
msgTopic.publish(new MqttMessage(msg.getBytes()));
}
private String leggiFile(String path) throws IOException {
String line;
String answer = "";
FileReader fileReader = new FileReader(path);
BufferedReader bufferedReader = new BufferedReader(fileReader);
while((line = bufferedReader.readLine()) != null) {
answer += line;
}
bufferedReader.close();
return answer;
}
/*
* Abbiamo 3 tipi di eventi:
* to Command Events - mi viene richiesto di eseguire un comando --> sottoscrivo
* from Status Events - io annuncio un cambiamento di stato
* (inviato a tutti i client che hanno sottoscritto il topic su cui faccio la publish)
* rpc Query Events - richiesta di inviare uno stato a un client (o microservizio). --> sottoscrivo
* Query implementati come coppia di eventi: <query event, status event>
* RICORDIAMO COME ABBIAMO IMPLEMENTATO LA RPC SU MQTT
*
*
*
* Il cuore della nostra business logic è un metodo in cui mi metto in attesa su mqtt e:
* se arriva un comando, chiamo il metodo responsabile per eseguire quel comando;
* se arriva un evento di stato, chiamo il metodo responsabile per gestire quell'evento di stato;
* se arriva una remote procedure call, attivo la risposta sulla remote procedure call
*
*
*
* Capire come gestire sistema a regole (vedi slide "Esempio" in "Applicazioni IoT "Cloud Based"")
*
*
*
* Sicurezza gestita con TLS
*
*
*
* FILE DA FARE per un servizio: (?)
* file che contiene le informazioni per connettersi al mosquitto locale
* file che dice qual'è la porta che lui deve esporre
* file che mi dice tutte le cose che lo configurano, ad esempio quanti device deve guardare
* ...
*
* I file di configurazione sono in JSON o in XML
* I dati persistiti si troveranno nella sottodirectory (chiamata con il nome del servizio) di Home
*
*
* Nella fase di configurazione potremo ancora modificare i file di configurazione (ad esempio aggiungendo device,
* regole). Dopodiché quando si fa partire il servizio si inizierà a usare l'altra parte di file system (ovvero il Local
* Dynamic File System) dove si andranno a salvare gli stati per renderli persistenti.
*
*
*
* Ho un processo principale che come unico compito ha quello di fare una fork() e creare un processo figlio.
* Sarà questo processo figlio a eseguire effettivamente il programma. Se il processo figlio termina (PER UN QUALCHE
* PROBLEMA), allora il processo padre se ne accorge e fa ripartire un nuovo processo figlio.
*
*
* Devo avere una thread di configurazione che obbedisce ai miei comandi (dati sulla HAT interface andando su
* 192.168.0.101:9001/configure.html (dove 192.168.0.101 è l'indirizzo della beaglebone)
*
*
* Ho una classe che implementa l'automa a stati finiti
*
*
*/
public static void main(String args[]) throws JSONException, IOException, MqttException {
//PER TESTARE PRIMA TOGLI IL TRY CATCH !!!!! POI QUANDO TUTTO FUNZIONA, METTI IL TRY CATCH, togli le throws E IMPLEMENTA LA RIPARTENZA!
// while(true) {
// try {
startSystem();
// }
// catch(Exception e) {
// // DA FARE:
// // qui metto il codice per far ripartire il processo: rileggo lo stato dei sensori e dell'interruttore,
// // capisco di conseguenza in quale stato dell'automa mi trovo e riparto.
// startSystem();
// }
// }
}
private static void startSystem() throws JSONException, IOException, MqttException {
MyQueue<Integer> codaVal = new MyQueue<Integer>();
MyQueue<Pair> codaMsg = new MyQueue<Pair>();
Automa automa = new Automa();
Antifurto antifurto = new Antifurto(automa);
Publisher publisher = new Publisher(codaMsg, antifurto);
Esecutore esec = new Esecutore(publisher, codaVal, automa, antifurto.interruttoreOutputSuono);
Timer timer = new Timer(30000,-5,esec,automa);
antifurto.startClient(esec, publisher);
publisher.start();
esec.start();
timer.start();
}
public static String getMqttTree() {
return mqttDomain+"/"+mqttSubdomain;
}