1. Python untuk IoT
Python telah menjadi salah satu bahasa pemrograman paling populer di dunia, dan dampaknya kini terasa kuat di ekosistem Internet of Things (IoT). Dengan sintaks yang bersih, ekosistem library yang masif, dan dukungan lintas platform, Python menawarkan jalur tercepat dari ide ke prototipe fungsional dalam proyek IoT.
Dalam konteks IoT, Python berperan sebagai "otak" pada perangkat komputasi yang lebih powerful seperti Raspberry Pi, BeagleBone, atau Jetson Nano. Berbeda dari Arduino atau ESP32 yang menggunakan bahasa C/C++, Python memungkinkan pengembang membangun sistem IoT yang kompleks dengan fitur-fitur tingkat tinggi seperti threading, akses file sistem, database, dan jaringan — semuanya dalam satu bahasa.
Mengapa Python untuk IoT?
| Keunggulan | Penjelasan |
|---|---|
| Sintaks Sederhana | Bisa langsung menulis kode tanpa kompilasi panjang, cocok untuk prototyping cepat |
| Library Kaya | paho-mqtt, RPi.GPIO, gpiozero, smbus2, APScheduler — semua tersedia via pip |
| Cross-Platform | Jalan di Linux, macOS, Windows, dan berbagai single-board computer |
| Komunitas Besar | Banyak tutorial, forum, dan contoh proyek IoT berbasis Python |
| Integrasi Mudah | Mudah terhubung ke REST API, database, MQTT broker, dan layanan cloud |
| Async Support | asyncio dan library async memungkinkan handling simultan banyak koneksi |
Python vs MicroPython: Mana yang Cocok?
Banyak pemula bingung antara Python biasa dan MicroPython. Keduanya memiliki peran berbeda dalam ekosistem IoT:
| Aspek | Python (CPython) | MicroPython |
|---|---|---|
| Jalur di | PC, Raspberry Pi, SBC | ESP32, ESP8266, RP2040, mikrokontroler |
| RAM Minimum | ~50 MB | ~256 KB |
| File System | Full access | Terbatas (FAT/LittleFS) |
| Library paho-mqtt | ✅ Berjalan sempurna | ❌ Tidak kompatibel langsung |
| Threading | ✅ Multithreading penuh | Limited |
| Cocok untuk | Gateway, server, edge computing | Node sensor/aktuator ringan |
Tutorial ini berfokus pada Python (CPython) yang berjalan di Raspberry Pi atau PC. Jika Anda ingin memprogram ESP32 langsung dengan Python, pelajari MicroPython secara terpisah. Namun, konsep dasar seperti MQTT dan GPIO tetap berlaku di kedua platform.
┌──────────────────────────────────────────────────────────┐ │ SISTEM IoT DENGAN PYTHON │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │ │ │ SENSOR │ │ RASPBERRY │ │ MQTT │ │ │ │ DHT11 ├───►│ PI 4B ├──►│ BROKER │ │ │ │ BME280 │ │ │ │ (Mosquitto)│ │ │ │ LDR │ │ Python 3.x │ └─────┬──────┘ │ │ └──────────────┘ │ │ │ │ │ │ paho-mqtt │ │ │ │ ┌──────────────┐ │ RPi.GPIO │ ▼ │ │ │ AKTUATOR │◄───│ gpiozero │ ┌────────────┐ │ │ │ LED / Relay │ │ APScheduler │ │ DASHBOARD │ │ │ │ Buzzer │ │ │ │ / Server │ │ │ │ Motor │ └──────────────┘ └────────────┘ │ │ └──────────────┘ │ └──────────────────────────────────────────────────────────┘
2. Setup Environment Python
Sebelum mulai menulis kode, kita perlu menyiapkan lingkungan kerja yang bersih dan terisolasi. Penggunaan virtual environment adalah praktik terbaik agar dependensi proyek IoT tidak bertabrakan dengan package sistem operasi.
Langkah 1: Instalasi Python 3
Sebagian besar distribusi Linux sudah menyertakan Python 3. Namun, pastikan Anda menggunakan versi 3.9 atau lebih baru:
# Cek versi Python yang terinstal python3 --version # Instalasi di Raspberry Pi OS / Debian / Ubuntu sudo apt update sudo apt install python3 python3-pip python3-venv -y # Instalasi di Fedora / CentOS sudo dnf install python3 python3-pip -y
Langkah 2: Buat Virtual Environment
# Buat direktori proyek mkdir ~/iot-project && cd ~/iot-project # Buat virtual environment python3 -m venv venv # Aktifkan virtual environment source venv/bin/activate # Verifikasi which python3 # Output: /home/pi/iot-project/venv/bin/python3
Langkah 3: Instalasi Library IoT
Berikut daftar library utama yang kita butuhkan beserta fungsinya:
| Package | Fungsi | Instalasi |
|---|---|---|
| paho-mqtt | MQTT client untuk komunikasi pub/sub | pip install paho-mqtt |
| RPi.GPIO | Kontrol GPIO pin Raspberry Pi | pip install RPi.GPIO |
| gpiozero | API level tinggi untuk GPIO (LED, sensor, aktuator) | pip install gpiozero |
| smbus2 | Komunikasi I2C untuk sensor seperti BME280 | pip install smbus2 |
| adafruit-circuitpython-dht | Pembacaan sensor DHT11/DHT22 | pip install adafruit-circuitpython-dht |
| APScheduler | Penjadwalan tugas otomatis | pip install APScheduler |
| requests | HTTP client untuk REST API | pip install requests |
# Instalasi semua library sekaligus pip install paho-mqtt RPi.GPIO gpiozero smbus2 \ adafruit-circuitpython-dht APScheduler requests # Simpan ke requirements.txt pip freeze > requirements.txt # Untuk reinstal di perangkat lain: # pip install -r requirements.txt
Library RPi.GPIO hanya berjalan di hardware Raspberry Pi. Jika Anda ingin mengembangkan di PC biasa (Windows/Linux), gunakan mode testing dengan os.environ["MOCK"] = "1" atau gunakan library pigpio yang mendukung remote GPIO. Alternatif lain adalah menulis kode dengan abstraksi agar mudah dipindah ke RPi.
3. MQTT Client dengan paho-mqtt
MQTT (Message Queuing Telemetry Transport) adalah protokol komunikasi ringan berbasis publish/subscribe yang menjadi standar de facto di dunia IoT. Library paho-mqtt adalah implementasi MQTT paling populer untuk Python.
Konsep Dasar MQTT
Publisher 1 ┌──────────────┐ Subscriber A
(Sensor Suhu) ────►│ │──────► (Dashboard)
│ MQTT BROKER │
Publisher 2 │ (Mosquitto) │ Subscriber B
(Sensor Gas) ────►│ │──────► (Alert System)
│ │
Subscriber C │ │
(Telegram Bot) ◄──│ │
└──────────────┘
Topik (Topics):
├── home/sensor/suhu → Publisher 1 → Subscriber A
├── home/sensor/gas → Publisher 2 → Subscriber B
├── home/sensor/# → Subscriber C (wildcard)
└── home/actuator/lampu → Subscriber D → Aktuator
3.1 — MQTT Subscriber Sederhana
Subscriber adalah komponen yang mendengarkan pesan dari broker. Mari kita mulai dengan subscriber yang menerima pesan dari topic tertentu:
"""
MQTT Subscriber - Menerima data dari topic IoT
IoTHub - https://iothub.id
"""
import paho.mqtt.client as mqtt
# --- Konfigurasi ---
BROKER_HOST = "localhost" # Alamat MQTT broker
BROKER_PORT = 1883 # Port default MQTT
CLIENT_ID = "subscriber-rpi"
TOPIC = "home/sensor/suhu"
def on_connect(client, userdata, flags, rc, properties=None):
"""Callback saat koneksi ke broker berhasil."""
if rc == 0:
print(f"[OK] Terhubung ke MQTT broker ({BROKER_HOST}:{BROKER_PORT})")
# Subscribe ke topik setelah koneksi berhasil
client.subscribe(TOPIC, qos=1)
print(f"[OK] Subscribe ke topik: {TOPIC}")
else:
print(f"[ERROR] Gagal koneksi, kode: {rc}")
def on_message(client, userdata, msg):
"""Callback saat menerima pesan dari topik yang di-subscribe."""
payload = msg.payload.decode("utf-8")
print(f"[DATA] {msg.topic} → {payload}")
def on_disconnect(client, userdata, rc, properties=None):
"""Callback saat koneksi terputus."""
if rc != 0:
print(f"[WARN] Koneksi terputus (rc={rc}), mencoba reconnect...")
# --- Setup Client ---
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)
# Set Last Will and Testament (LWT)
# Pesan ini dikirim broker jika client disconnect mendadak
client.will_set(
topic="home/status/subscriber",
payload="OFFLINE",
qos=1,
retain=True
)
# Hubungkan callback
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Koneksi ke broker
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# Mulai loop untuk menerima pesan secara terus-menerus
print("[INFO] Subscriber berjalan... Tekan Ctrl+C untuk berhenti.")
client.loop_forever()
3.2 — MQTT Publisher Sederhana
Publisher adalah komponen yang mengirim pesan ke broker. Berikut contoh publisher yang mengirim data suhu simulasi:
"""
MQTT Publisher - Mengirim data sensor ke broker
IoTHub - https://iothub.id
"""
import json
import time
import random
import paho.mqtt.client as mqtt
BROKER_HOST = "localhost"
BROKER_PORT = 1883
CLIENT_ID = "publisher-sensor"
TOPIC = "home/sensor/suhu"
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print(f"[OK] Publisher terhubung ke broker")
else:
print(f"[ERROR] Gagal koneksi, kode: {rc}")
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)
# Aktifkan autentikasi jika broker membutuhkan
# client.username_pw_set("iot_user", "rahasia123")
# Set LWT untuk publisher
client.will_set("home/status/publisher", "OFFLINE", qos=1, retain=True)
client.on_connect = on_connect
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# Publish status online (retain=True agar subscriber baru langsung tahu)
client.publish("home/status/publisher", "ONLINE", qos=1, retain=True)
# Loop publish data
try:
print("[INFO] Mulai mengirim data...")
while True:
# Simulasi pembacaan sensor
suhu = round(random.uniform(24.0, 35.0), 1)
kelembaban = round(random.uniform(40.0, 80.0), 1)
# Buat payload JSON
payload = json.dumps({
"suhu": suhu,
"kelembaban": kelembaban,
"unit": "celcius",
"timestamp": int(time.time())
})
# Publish dengan QoS 1 (at least once delivery)
result = client.publish(TOPIC, payload, qos=1)
print(f"[KIRIM] {payload}")
time.sleep(5) # Kirim setiap 5 detik
except KeyboardInterrupt:
print("\n[INFO] Publisher dihentikan.")
client.publish("home/status/publisher", "OFFLINE", qos=1, retain=True)
client.disconnect()
3.3 — Memahami QoS (Quality of Service)
| Level | Nama | Penjelasan | Cocok untuk |
|---|---|---|---|
| QoS 0 | At Most Once | Pesan dikirim sekali tanpa konfirmasi. Bisa hilang. | Data non-kritis,高频 update |
| QoS 1 | At Least Once | Pesan dijamin sampai, tapi mungkin duplikat | Sensor suhu, status aktuator |
| QoS 2 | Exactly Once | Pesan dijamin sampai tepat sekali. Paling lambat. | Kritis: alarm, perintah aktuator |
LWT adalah pesan khusus yang diatur saat client pertama kali connect ke broker. Jika client disconnect secara tidak normal (misalnya mati listrik), broker akan otomatis mempublikasikan LWT ke topik yang ditentukan. Ini sangat berguna untuk memantau status perangkat IoT secara real-time. Pastikan set retain=True agar subscriber baru langsung mengetahui status terakhir.
4. GPIO Control via Python
GPIO (General Purpose Input/Output) adalah pin digital pada Raspberry Pi yang bisa dikonfigurasi sebagai input atau output. Dengan Python, kita bisa mengontrol LED, relay, motor, dan membaca status tombol atau sensor digital.
4.1 — Menggunakan RPi.GPIO
Library RPi.GPIO adalah library klasik yang memberikan kontrol langsung ke pin GPIO Raspberry Pi:
"""
GPIO Control dengan RPi.GPIO
IoTHub - https://iothub.id
"""
import RPi.GPIO as GPIO
import time
# --- Setup Pin ---
LED_PIN = 17 # GPIO17 (fisik pin 11)
BUTTON_PIN = 27 # GPIO27 (fisik pin 13)
# Gunakan penomoran BOARD atau BCM
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# Setup pin LED sebagai OUTPUT
GPIO.setup(LED_PIN, GPIO.OUT)
# Setup pin Tombol sebagai INPUT dengan pull-up internal
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# --- Blink LED ---
print("[INFO] Blink LED pada GPIO17...")
for i in range(5):
GPIO.output(LED_PIN, GPIO.HIGH) # Nyalakan LED
print(f" Blink {i+1}: ON")
time.sleep(0.5)
GPIO.output(LED_PIN, GPIO.LOW) # Matikan LED
print(f" Blink {i+1}: OFF")
time.sleep(0.5)
# --- Baca Tombol ---
print("\n[INFO] Tunggu tekan tombol pada GPIO27...")
try:
while True:
# Tombol active-low (ditekan = LOW)
if GPIO.input(BUTTON_PIN) == GPIO.LOW:
GPIO.output(LED_PIN, GPIO.HIGH)
print("[EVENT] Tombol ditekan! LED menyala.")
else:
GPIO.output(LED_PIN, GPIO.LOW)
time.sleep(0.1)
except KeyboardInterrupt:
print("\n[INFO] Program dihentikan.")
finally:
GPIO.cleanup() # Selalu bersihkan GPIO saat selesai!
4.2 — Menggunakan gpiozero (API Level Tinggi)
Library gpiozero menyediakan API yang jauh lebih bersih dan Pythonic. Sangat direkomendasikan untuk proyek baru:
"""
GPIO Control dengan gpiozero (lebih Pythonic)
IoTHub - https://iothub.id
"""
from gpiozero import LED, Button, Buzzer
from signal import pause
import time
# --- Inisialisasi Komponen ---
led = LED(17) # LED pada GPIO17
button = Button(27) # Tombol pada GPIO27
buzzer = Buzzer(22) # Buzzer pada GPIO22
# --- Blink LED dengan method gpiozero ---
print("[INFO] Blink LED dengan gpiozero...")
led.blink(on_time=0.5, off_time=0.5, n=3, background=False)
# --- Event-driven: Tombol menyalakan LED ---
print("[INFO] Mode event-driven. Tekan tombol untuk LED ON.")
button.when_pressed = lambda: (led.on(), print("[EVENT] LED ON"))
button.when_released = lambda: (led.off(), print("[EVENT] LED OFF"))
# Jalankan event loop
pause()
RPi.GPIO: Cocok jika Anda terbiasa dengan gaya pemrograman Arduino/low-level. Anda harus manual mengatur setup, input/output, dan cleanup. gpiozero: Lebih cocok untuk pemula karena menggunakan pendekatan object-oriented — cukup buat objek LED, panggil method-nya. Keduanya bisa dipakai bersamaan, tapi hindari mengontrol pin yang sama dari kedua library secara bersamaan.
Pin Mapping Raspberry Pi (BCM Mode)
| Fungsi | Pin BCM | Pin Fisik | Catatan |
|---|---|---|---|
| LED A | GPIO 17 | Pin 11 | Cocok untuk output LED/relay |
| Tombol A | GPIO 27 | Pin 13 | Input dengan pull-up internal |
| Buzzer | GPIO 22 | Pin 15 | Output aktif HIGH |
| Sensor IR | GPIO 23 | Pin 16 | Input digital dari sensor IR |
| SPI CE0 | GPIO 8 | Pin 24 | Pin SPI chip enable |
| I2C SDA | GPIO 2 | Pin 3 | Data line I2C |
| I2C SCL | GPIO 3 | Pin 5 | Clock line I2C |
5. Membaca Data Sensor dengan Python
Salah satu kebutuhan utama dalam proyek IoT adalah membaca data dari sensor fisik. Pada bagian ini, kita akan mempelajari cara membaca sensor DHT11/DHT22 (suhu & kelembaban) dan BME280 (suhu, kelembaban & tekanan udara) menggunakan Python di Raspberry Pi.
5.1 — Membaca Sensor DHT11/DHT22
"""
Pembacaan Sensor DHT11/DHT22 dengan Python
IoTHub - https://iothub.id
Rangkaian:
DHT11 Pin 1 (VCC) → 3.3V Raspberry Pi
DHT11 Pin 2 (DATA) → GPIO4 (Pin 7) + Resistor 10kΩ ke 3.3V
DHT11 Pin 4 (GND) → GND Raspberry Pi
"""
import time
import board
import adafruit_dht
# --- Inisialisasi Sensor ---
# Untuk DHT11 gunakan adafruit_dht.DHT11(board.D4)
# Untuk DHT22 gunakan adafruit_dht.DHT22(board.D4)
sensor = adafruit_dht.DHT11(board.D4)
# Interval pembacaan (detik)
interval = 5
def baca_sensor():
"""Membaca data dari sensor DHT dan mengembalikan dict."""
try:
suhu = sensor.temperature # Celsius
kelembaban = sensor.humidity # Persen
if suhu is None or kelembaban is None:
return None
return {
"suhu": round(suhu, 1),
"kelembaban": round(kelembaban, 1),
"timestamp": int(time.time())
}
except RuntimeError as e:
# Error umum DHT: CRC check failed
print(f"[WARN] Sensor error: {e}")
return None
except Exception as e:
print(f"[ERROR] Error tak terduga: {e}")
return None
def main():
print("=== Pembaca Sensor DHT11 ===")
print(f"Membaca setiap {interval} detik...\n")
try:
while True:
data = baca_sensor()
if data:
print(f"🌡️ Suhu : {data['suhu']} °C")
print(f"💧 Kelembaban : {data['kelembaban']} %")
print(f"⏰ Timestamp : {data['timestamp']}")
print("---")
else:
print("[SKIP] Pembacaan gagal, coba lagi...\n")
time.sleep(interval)
except KeyboardInterrupt:
print("\n[INFO] Program dihentikan.")
finally:
sensor.exit()
if __name__ == "__main__":
main()
5.2 — Membaca Sensor BME280 via I2C (smbus2)
Sensor BME280 menggunakan protokol I2C dan mengukur suhu, kelembaban, dan tekanan udara. Berikut cara membacanya langsung menggunakan library smbus2:
"""
Pembacaan Sensor BME280 via I2C dengan smbus2
IoTHub - https://iothub.id
Rangkaian:
BME280 VCC → 3.3V
BME280 GND → GND
BME280 SDA → GPIO2 (Pin 3)
BME280 SCL → GPIO3 (Pin 5)
BME280 SDO → GND (Alamat I2C: 0x76)
"""
import time
import struct
from smbus2 import SMBus
# Alamat default BME280 (jika SDO ke GND = 0x76, ke VCC = 0x77)
BME280_ADDR = 0x76
# Register BME280
REG_ID = 0xD0
REG_RESET = 0xE0
REG_CTRL_HUM = 0xF2
REG_CTRL_MEAS = 0xF4
REG_CONFIG = 0xF5
REG_DATA_START = 0xF7
# Kalibrasi data register
CALIB_26_41 = 0xE1
CALIB_00_25 = 0x88
CALIB_00_25_LEN = 26
CALIB_26_41_LEN = 7
class BME280:
"""Driver sederhana untuk sensor BME280."""
def __init__(self, bus_number=1, address=BME280_ADDR):
self.bus = SMBus(bus_number)
self.addr = address
# Verifikasi ID chip (harus 0x60)
chip_id = self.bus.read_byte_data(self.addr, REG_ID)
if chip_id != 0x60:
raise RuntimeError(f"BME280 tidak terdeteksi! Chip ID: {chip_id:#x}")
# Baca data kalibrasi
self._load_calibration()
# Konfigurasi: oversampling, mode normal
self.bus.write_byte_data(self.addr, REG_CTRL_HUM, 0x05) # Humidity x16
self.bus.write_byte_data(self.addr, REG_CTRL_MEAS, 0xB7) # Temp x16, Press x16, Normal mode
self.bus.write_byte_data(self.addr, REG_CONFIG, 0xA0) # Standby 1000ms
def _load_calibration(self):
"""Baca register kalibrasi untuk kompensasi suhu & tekanan."""
cal26 = self.bus.read_i2c_block_data(self.addr, CALIB_26_41, CALIB_26_41_LEN)
cal00 = self.bus.read_i2c_block_data(self.addr, CALIB_00_25, CALIB_00_25_LEN)
# Kompensasi suhu
self.dig_T1 = struct.unpack(' 2047:
self.dig_H4 -= 4096
self.dig_H5 = (cal26[6] << 4) | ((cal26[5] >> 4) & 0x0F)
if self.dig_H5 > 2047:
self.dig_H5 -= 4096
self.dig_H6 = struct.unpack('> 4)
raw_temp = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
raw_hum = (data[6] << 8) | data[7]
return raw_temp, raw_press, raw_hum
def read_compensated(self):
"""Baca dan kompensasi data suhu, tekanan, kelembaban."""
raw_temp, raw_press, raw_hum = self.read_raw()
# Kompensasi suhu (dari datasheet BME280)
var1 = (raw_temp / 16384.0 - self.dig_T1 / 1024.0) * self.dig_T2
var2 = ((raw_temp / 131072.0 - self.dig_T1 / 8192.0) ** 2) * self.dig_T3
t_fine = var1 + var2
suhu = t_fine / 5120.0
# Kompensasi kelembaban
h = t_fine - 76800.0
h = (raw_hum - (self.dig_H4 * 64.0 + self.dig_H5 / 16384.0 * h)) * \
(self.dig_H2 / 65536.0 * (1.0 + self.dig_H6 / 67108864.0 * h *
(1.0 + self.dig_H3 / 67108864.0 * h)))
kelembaban = max(0.0, min(h, 100.0))
return round(suhu, 2), round(kelembaban, 2)
# --- Program Utama ---
if __name__ == "__main__":
sensor = BME280()
print("=== BME280 Sensor Reader ===\n")
try:
while True:
suhu, kelembaban = sensor.read_compensated()
print(f"🌡️ Suhu : {suhu} °C")
print(f"💧 Kelembaban : {kelembaban} %")
print("---")
time.sleep(3)
except KeyboardInterrupt:
print("\n[INFO] Selesai.")
BME280 juga bisa dibaca menggunakan library adafruit-circuitpython-bme280 yang lebih sederhana. Namun, contoh di atas menunjukkan cara membaca register langsung menggunakan smbus2 agar Anda memahami cara kerja komunikasi I2C di level register. Untuk proyek produksi, gunakan library tingkat tinggi untuk kemudahan.
6. Scheduling & Cron Jobs
Sistem IoT yang andal membutuhkan penjadwalan tugas otomatis — misalnya mengirim laporan setiap jam, membersihkan log setiap tengah malam, atau menjalankan kalibrasi sensor secara berkala. Python menawarkan beberapa pilihan untuk ini.
6.1 — APScheduler: Penjadwalan di dalam Aplikasi Python
APScheduler adalah library scheduling paling fleksibel untuk Python. Berbeda dari cron, APScheduler berjalan di dalam proses Python dan mendukung tiga jenis scheduler: interval (periodik), date (sekali di waktu tertentu), dan cron (polanya mirip system cron).
"""
APScheduler - Penjadwalan Tugas Otomatis untuk IoT
IoTHub - https://iothub.id
"""
import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("iot-scheduler")
def baca_sensor_suhu():
"""Tugas periodik: Baca data suhu setiap 10 detik."""
import random
suhu = round(random.uniform(25.0, 32.0), 1)
timestamp = datetime.now().strftime("%H:%M:%S")
logger.info(f"[SENSOR] Suhu terbaca: {suhu} °C")
def kirim_laporan_harian():
"""Tugas cron: Kirim laporan setiap jam 08:00 pagi."""
logger.info("[LAPORAN] Mengirim laporan harian ke server...")
def bersihkan_log():
"""Tugas cron: Bersihkan log file setiap tengah malam."""
logger.info("[CLEANUP] Membersihkan log lama...")
def kalibrasi_sensor():
"""Tugas interval: Kalibrasi sensor setiap 6 jam."""
logger.info("[KALIBRASI] Menjalankan kalibrasi sensor BME280...")
# --- Setup Scheduler ---
scheduler = BackgroundScheduler()
# Tugas 1: Baca sensor setiap 10 detik
scheduler.add_job(
func=baca_sensor_suhu,
trigger=IntervalTrigger(seconds=10),
id="baca_sensor",
name="Pembacaan Sensor Suhu",
replace_existing=True
)
# Tugas 2: Laporan harian jam 08:00
scheduler.add_job(
func=kirim_laporan_harian,
trigger=CronTrigger(hour=8, minute=0),
id="laporan_harian",
name="Laporan Harian"
)
# Tugas 3: Bersihkan log tengah malam
scheduler.add_job(
func=bersihkan_log,
trigger=CronTrigger(hour=0, minute=0),
id="bersihkan_log",
name="Pembersihan Log"
)
# Tugas 4: Kalibrasi setiap 6 jam
scheduler.add_job(
func=kalibrasi_sensor,
trigger=IntervalTrigger(hours=6),
id="kalibrasi",
name="Kalibrasi Sensor"
)
# Mulai scheduler
scheduler.start()
logger.info("[OK] Scheduler dimulai dengan 4 tugas terjadwal.")
# Tampilkan jadwal yang aktif
for job in scheduler.get_jobs():
logger.info(f" 📋 {job.name} → next run: {job.next_run_time}")
# Jalankan selama 30 detik untuk demo
try:
logger.info("[INFO] Scheduler berjalan selama 30 detik...")
time.sleep(30)
except KeyboardInterrupt:
pass
scheduler.shutdown()
logger.info("[INFO] Scheduler dihentikan.")
6.2 — Perbandingan: APScheduler vs System Cron
| Fitur | APScheduler | System Cron |
|---|---|---|
| Cara kerja | Di dalam proses Python | OS-level, menjalankan script baru |
| Interaksi antar tugas | ✅ Bisa berbagi state | ❌ Terisolasi |
| Misfire handling | ✅ Grace period & retry | ❌ Perlu penanganan manual |
| Persistence | Via SQLAlchemy/jobstore | ✅ Tersimpan di /etc/crontab |
| Overhead | Minimal (in-process) | Process spawn tiap eksekusi |
| Cocok untuk | App Python yang sudah berjalan | Schedule mandiri tanpa app |
Saat menggunakan APScheduler di produksi, pastikan: (1) Gunakan BackgroundScheduler atau AsyncIOScheduler — jangan gunakan BlockingScheduler jika Anda juga perlu menjalankan MQTT client. (2) Aktifkan misfire_grace_time agar tugas tidak melewati deadline jika server sedang sibuk. (3) Gunakan jobstore berbasis database jika perlu persistensi setelah restart.
7. Proyek: Sistem Monitoring Otomatis
Sekarang kita akan menggabungkan semua konsep yang sudah dipelajari menjadi satu proyek nyata: Sistem Monitoring Suhu & Kelembaban Otomatis. Sistem ini akan membaca sensor, mengirim data ke MQTT broker, menjadwalkan pembacaan otomatis, dan mengontrol relay berdasarkan threshold.
Arsitektur Proyek
┌────────────┐ ┌──────────────────────────────────┐ ┌───────────────┐
│ DHT11 ├────►│ RASPBERRY PI 4B │ │ MQTT BROKER │
│ (Sensor) │ │ ├────►│ (Mosquitto) │
└────────────┘ │ ┌─────────────────────────────┐ │ └───────┬───────┘
│ │ main_monitor.py │ │ │
┌────────────┐ │ │ │ │ ┌───────▼───────┐
│ LED │◄────┤ │ 1. Baca sensor (interval) │ │ │ Dashboard / │
│ (Status) │ │ │ 2. Kirim ke MQTT │ │ │ Telegram Bot │
└────────────┘ │ │ 3. Cek threshold │ │ └───────────────┘
│ │ 4. Kontrol relay │ │
┌────────────┐ │ │ 5. Log ke file │ │
│ RELAY │◄────┤ └─────────────────────────────┘ │
│ (Kipas/AC)│ └──────────────────────────────────┘
└────────────┘
Implementasi Lengkap
"""
Sistem Monitoring Suhu & Kelembaban Otomatis
IoTHub - https://iothub.id
Fitur:
- Pembacaan DHT11 berkala via APScheduler
- Publish data ke MQTT broker (paho-mqtt)
- Kontrol relay otomatis berdasarkan threshold
- Logging ke file
- LED status: blink saat error, solid saat normal
"""
import json
import time
import logging
import random # Ganti dengan baca sensor asli
from datetime import datetime
import paho.mqtt.client as mqtt
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
# === KONFIGURASI ===
BROKER_HOST = "localhost"
BROKER_PORT = 1883
CLIENT_ID = "monitor-rpi-01"
TOPIC_DATA = "home/sensor/suhu"
TOPIC_STATUS = "home/status/monitor"
TOPIC_RELAY = "home/actuator/relay"
# Threshold untuk kontrol otomatis
SUHU_ON_THRESHOLD = 30.0 # Suhu di atas ini → nyalakan kipas
SUHU_OFF_THRESHOLD = 27.0 # Suhu di bawah ini → matikan kipas
RELAY_PIN = 17 # GPIO pin relay
LED_PIN = 27 # GPIO pin LED status
INTERVAL = 10 # Detik antara pembacaan
# === LOGGING ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("monitor.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("monitor")
# === STATE GLOBAL ===
relay_aktif = False
def baca_sensor():
"""
Baca data dari DHT11.
Ganti bagian simulasi dengan kode sensor asli:
import board, adafruit_dht
sensor = adafruit_dht.DHT11(board.D4)
suhu = sensor.temperature
kelembaban = sensor.humidity
"""
suhu = round(random.uniform(24.0, 34.0), 1)
kelembaban = round(random.uniform(40.0, 80.0), 1)
return suhu, kelembaban
def kontrol_relay(suhu):
"""Kontrol relay berdasarkan threshold suhu."""
global relay_aktif
if suhu >= SUHU_ON_THRESHOLD and not relay_aktif:
relay_aktif = True
logger.warning(f"⚡ RELAY ON — Suhu {suhu}°C ≥ {SUHU_ON_THRESHOLD}°C")
# GPIO.output(RELAY_PIN, GPIO.HIGH) # Aktifkan relay
elif suhu <= SUHU_OFF_THRESHOLD and relay_aktif:
relay_aktif = False
logger.info(f"🔴 RELAY OFF — Suhu {suhu}°C ≤ {SUHU_OFF_THRESHOLD}°C")
# GPIO.output(RELAY_PIN, GPIO.LOW) # Matikan relay
def tugas_pembacaan():
"""Tugas utama: baca sensor → publish → kontrol relay."""
try:
suhu, kelembaban = baca_sensor()
# Susun payload
payload = json.dumps({
"suhu": suhu,
"kelembaban": kelembaban,
"relay": "ON" if relay_aktif else "OFF",
"device": CLIENT_ID,
"timestamp": datetime.now().isoformat()
})
# Publish ke MQTT
client.publish(TOPIC_DATA, payload, qos=1)
logger.info(f"📊 Data terkirim: {payload}")
# Kontrol relay otomatis
kontrol_relay(suhu)
except Exception as e:
logger.error(f"❌ Error pembacaan: {e}")
# === MQTT SETUP ===
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
logger.info(f"✅ MQTT terhubung ke {BROKER_HOST}")
client.publish(TOPIC_STATUS, "ONLINE", qos=1, retain=True)
# Subscribe ke topik kontrol relay dari dashboard
client.subscribe(f"{TOPIC_RELAY}/command", qos=1)
logger.info(f"📡 Subscribe ke {TOPIC_RELAY}/command")
else:
logger.error(f"❌ MQTT gagal koneksi, rc={rc}")
def on_message(client, userdata, msg):
"""Handler untuk perintah relay dari dashboard."""
command = msg.payload.decode("utf-8")
global relay_aktif
if command == "ON":
relay_aktif = True
logger.info("🔔 Relay diaktifkan via MQTT command")
elif command == "OFF":
relay_aktif = False
logger.info("🔔 Relay dimatikan via MQTT command")
def on_disconnect(client, userdata, rc, properties=None):
if rc != 0:
logger.warning(f"⚠️ MQTT terputus (rc={rc})")
# === INISIALISASI ===
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)
client.will_set(TOPIC_STATUS, "OFFLINE", qos=1, retain=True)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Hubungkan
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# Mulai scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(
func=tugas_pembacaan,
trigger=IntervalTrigger(seconds=INTERVAL),
id="pembacaan_sensor",
name="Pembacaan Sensor & Publish MQTT"
)
scheduler.start()
logger.info(f"📅 Scheduler aktif — interval {INTERVAL} detik")
# === MAIN LOOP ===
logger.info("🚀 Sistem monitoring dimulai!")
logger.info(f" Threshold ON : {SUHU_ON_THRESHOLD}°C")
logger.info(f" Threshold OFF : {SUHU_OFF_THRESHOLD}°C")
try:
# Jalankan MQTT loop di thread utama
client.loop_forever()
except KeyboardInterrupt:
logger.info("\n🛑 Sistem dihentikan oleh user.")
finally:
client.publish(TOPIC_STATUS, "OFFLINE", qos=1, retain=True)
scheduler.shutdown()
client.disconnect()
logger.info("👋 Shutdown selesai.")
Untuk menjalankan sistem monitoring secara permanen di Raspberry Pi, gunakan systemd service atau screen/tmux. Berikut contoh perintah screen: screen -S monitor → jalankan python3 main_monitor.py → tekan Ctrl+A lalu D untuk detach. Gunakan screen -r monitor untuk kembali.
8. Best Practices & Error Handling
Sistem IoT yang berjalan 24/7 membutuhkan error handling yang robust. Berikut praktik terbaik yang wajib diterapkan dalam proyek IoT Python Anda:
8.1 — Error Handling yang Benar
"""
Best Practices Error Handling untuk IoT Python
IoTHub - https://iothub.id
"""
import logging
import time
import json
from functools import wraps
from datetime import datetime
# === DECORATOR: RETRY dengan backoff ===
def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
"""Decorator untuk retry otomatis dengan exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
delay = min(base_delay * (2 ** (retries - 1)), max_delay)
logging.warning(
f"[RETRY] {func.__name__} gagal (attempt {retries}/{max_retries}): {e}"
)
if retries < max_retries:
logging.info(f"[RETRY] Menunggu {delay:.1f}s sebelum retry...")
time.sleep(delay)
else:
logging.error(f"[FATAL] {func.__name__} gagal setelah {max_retries} percobaan!")
raise
return wrapper
return decorator
# === CONTOH PENGGUNAAN ===
@retry_with_backoff(max_retries=3, base_delay=2.0)
def kirim_ke_broker(client, topic, payload):
"""Kirim data ke MQTT broker dengan retry."""
result = client.publish(topic, json.dumps(payload), qos=1)
if result.rc != 0:
raise ConnectionError(f"Publish gagal, rc={result.rc}")
return True
@retry_with_backoff(max_retries=5)
def baca_sensor_dht():
"""Baca sensor DHT dengan retry."""
import board, adafruit_dht
sensor = adafruit_dht.DHT11(board.D4)
suhu = sensor.temperature
if suhu is None:
raise ValueError("Pembacaan sensor mengembalikan None")
return suhu
# === HEALTH CHECK LOGGER ===
class HealthMonitor:
"""Monitor kesehatan sistem IoT."""
def __init__(self):
self.errors = []
self.last_success = datetime.now()
self.max_error_age = 300 # 5 menit
def record_success(self):
self.last_success = datetime.now()
self.errors.clear()
def record_error(self, error_msg):
self.errors.append({
"time": datetime.now().isoformat(),
"error": error_msg
})
if len(self.errors) > 10:
self.errors = self.errors[-10:] # Simpan 10 error terakhir
def is_healthy(self):
"""Cek apakah sistem masih sehat."""
age = (datetime.now() - self.last_success).total_seconds()
return age < self.max_error_age and len(self.errors) < 5
def get_status_report(self):
"""Generate laporan kesehatan."""
age = (datetime.now() - self.last_success).total_seconds()
return {
"healthy": self.is_healthy(),
"last_success_ago": f"{age:.0f}s",
"recent_errors": len(self.errors),
"uptime_ok": age < self.max_error_age
}
# === KONFIGURASI LOGGING YANG BAIK ===
def setup_logging(log_file="iot_system.log"):
"""Setup logging untuk sistem IoT."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger("iot")
Checklist Best Practices
| Praktik | Penjelasan | Prioritas |
|---|---|---|
| SELALU pakai try/except | Wrap semua pembacaan sensor dan MQTT publish dalam try/except | 🔴 Tinggi |
| Retry dengan backoff | Jangan retry langsung — gunakan delay eksponensial | 🔴 Tinggi |
| Logging ke file | Catat semua aktivitas ke log file untuk debugging | 🔴 Tinggi |
| LWT MQTT aktif | Gunakan Last Will agar broker tahu jika device offline | 🟡 Sedang |
| Watchdog timer | Gunakan hardware/software watchdog untuk auto-restart | 🟡 Sedang |
| Konfigurasi via file/env | Jangan hardcode kredensial di kode sumber | 🟡 Sedang |
| Graceful shutdown | Pastikan relay aktuator dimatikan saat program berhenti | 🔴 Tinggi |
| Health check endpoint | Exposes endpoint untuk monitoring dari luar | 🟢 Opsional |
1. Hardcode password di kode sumber — gunakan environment variable atau file config terpisah. 2. Menggunakan time.sleep() panjang di main thread — gunakan scheduler atau threading agar MQTT loop tidak ter-block. 3. Tidak ada GPIO.cleanup() — pin GPIO bisa tertinggal di state yang salah setelah program berhenti. 4. Mengabaikan error NaN dari sensor — DHT11 sering mengembalikan None/NaN, pastikan selalu dicek.
9. Quiz: Uji Pemahamanmu!
Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut untuk menguji pemahamanmu tentang otomasi IoT dengan Python: