Python untuk IoT

Otomasi IoT dengan Python: MQTT, GPIO & Scheduling

👑 Premium

Panduan lengkap membangun sistem IoT otomatis dengan Python — komunikasi MQTT, kontrol GPIO, pembacaan sensor, dan penjadwalan tugas otomatis

1. Python untuk IoT

Python telah menjadi salah satu bahasa pemrograman paling populer di dunia, dan dampaknya kini terasa kuat di ekosistem Internet of Things (IoT). Dengan sintaks yang bersih, ekosistem library yang masif, dan dukungan lintas platform, Python menawarkan jalur tercepat dari ide ke prototipe fungsional dalam proyek IoT.

Dalam konteks IoT, Python berperan sebagai "otak" pada perangkat komputasi yang lebih powerful seperti Raspberry Pi, BeagleBone, atau Jetson Nano. Berbeda dari Arduino atau ESP32 yang menggunakan bahasa C/C++, Python memungkinkan pengembang membangun sistem IoT yang kompleks dengan fitur-fitur tingkat tinggi seperti threading, akses file sistem, database, dan jaringan — semuanya dalam satu bahasa.

Mengapa Python untuk IoT?

Keunggulan Penjelasan
Sintaks SederhanaBisa langsung menulis kode tanpa kompilasi panjang, cocok untuk prototyping cepat
Library Kayapaho-mqtt, RPi.GPIO, gpiozero, smbus2, APScheduler — semua tersedia via pip
Cross-PlatformJalan di Linux, macOS, Windows, dan berbagai single-board computer
Komunitas BesarBanyak tutorial, forum, dan contoh proyek IoT berbasis Python
Integrasi MudahMudah terhubung ke REST API, database, MQTT broker, dan layanan cloud
Async Supportasyncio dan library async memungkinkan handling simultan banyak koneksi

Python vs MicroPython: Mana yang Cocok?

Banyak pemula bingung antara Python biasa dan MicroPython. Keduanya memiliki peran berbeda dalam ekosistem IoT:

Aspek Python (CPython) MicroPython
Jalur diPC, Raspberry Pi, SBCESP32, ESP8266, RP2040, mikrokontroler
RAM Minimum~50 MB~256 KB
File SystemFull accessTerbatas (FAT/LittleFS)
Library paho-mqtt✅ Berjalan sempurna❌ Tidak kompatibel langsung
Threading✅ Multithreading penuhLimited
Cocok untukGateway, server, edge computingNode sensor/aktuator ringan
💡 Tips

Tutorial ini berfokus pada Python (CPython) yang berjalan di Raspberry Pi atau PC. Jika Anda ingin memprogram ESP32 langsung dengan Python, pelajari MicroPython secara terpisah. Namun, konsep dasar seperti MQTT dan GPIO tetap berlaku di kedua platform.

Diagram: Arsitektur Sistem IoT dengan Python
┌──────────────────────────────────────────────────────────┐
│                  SISTEM IoT DENGAN PYTHON                │
│                                                          │
│  ┌──────────────┐    ┌──────────────┐   ┌────────────┐  │
│  │   SENSOR     │    │  RASPBERRY   │   │   MQTT     │  │
│  │   DHT11      ├───►│  PI 4B       ├──►│   BROKER   │  │
│  │   BME280     │    │              │   │  (Mosquitto)│  │
│  │   LDR        │    │  Python 3.x  │   └─────┬──────┘  │
│  └──────────────┘    │              │         │          │
│                      │  paho-mqtt   │         │          │
│  ┌──────────────┐    │  RPi.GPIO    │         ▼          │
│  │  AKTUATOR    │◄───│  gpiozero    │   ┌────────────┐  │
│  │  LED / Relay  │    │  APScheduler │   │  DASHBOARD │  │
│  │  Buzzer      │    │              │   │  / Server  │  │
│  │  Motor       │    └──────────────┘   └────────────┘  │
│  └──────────────┘                                        │
└──────────────────────────────────────────────────────────┘

2. Setup Environment Python

Sebelum mulai menulis kode, kita perlu menyiapkan lingkungan kerja yang bersih dan terisolasi. Penggunaan virtual environment adalah praktik terbaik agar dependensi proyek IoT tidak bertabrakan dengan package sistem operasi.

Langkah 1: Instalasi Python 3

Sebagian besar distribusi Linux sudah menyertakan Python 3. Namun, pastikan Anda menggunakan versi 3.9 atau lebih baru:

Bash
# Cek versi Python yang terinstal
python3 --version

# Instalasi di Raspberry Pi OS / Debian / Ubuntu
sudo apt update
sudo apt install python3 python3-pip python3-venv -y

# Instalasi di Fedora / CentOS
sudo dnf install python3 python3-pip -y

Langkah 2: Buat Virtual Environment

Bash
# Buat direktori proyek
mkdir ~/iot-project && cd ~/iot-project

# Buat virtual environment
python3 -m venv venv

# Aktifkan virtual environment
source venv/bin/activate

# Verifikasi
which python3
# Output: /home/pi/iot-project/venv/bin/python3

Langkah 3: Instalasi Library IoT

Berikut daftar library utama yang kita butuhkan beserta fungsinya:

Package Fungsi Instalasi
paho-mqttMQTT client untuk komunikasi pub/subpip install paho-mqtt
RPi.GPIOKontrol GPIO pin Raspberry Pipip install RPi.GPIO
gpiozeroAPI level tinggi untuk GPIO (LED, sensor, aktuator)pip install gpiozero
smbus2Komunikasi I2C untuk sensor seperti BME280pip install smbus2
adafruit-circuitpython-dhtPembacaan sensor DHT11/DHT22pip install adafruit-circuitpython-dht
APSchedulerPenjadwalan tugas otomatispip install APScheduler
requestsHTTP client untuk REST APIpip install requests
Bash
# Instalasi semua library sekaligus
pip install paho-mqtt RPi.GPIO gpiozero smbus2 \
  adafruit-circuitpython-dht APScheduler requests

# Simpan ke requirements.txt
pip freeze > requirements.txt

# Untuk reinstal di perangkat lain:
# pip install -r requirements.txt
⚠️ Peringatan

Library RPi.GPIO hanya berjalan di hardware Raspberry Pi. Jika Anda ingin mengembangkan di PC biasa (Windows/Linux), gunakan mode testing dengan os.environ["MOCK"] = "1" atau gunakan library pigpio yang mendukung remote GPIO. Alternatif lain adalah menulis kode dengan abstraksi agar mudah dipindah ke RPi.

3. MQTT Client dengan paho-mqtt

MQTT (Message Queuing Telemetry Transport) adalah protokol komunikasi ringan berbasis publish/subscribe yang menjadi standar de facto di dunia IoT. Library paho-mqtt adalah implementasi MQTT paling populer untuk Python.

Konsep Dasar MQTT

Diagram: Topologi MQTT Publish/Subscribe
  Publisher 1        ┌──────────────┐        Subscriber A
 (Sensor Suhu) ────►│              │──────► (Dashboard)
                     │ MQTT BROKER  │
  Publisher 2        │  (Mosquitto) │        Subscriber B
 (Sensor Gas)  ────►│              │──────► (Alert System)
                     │              │
  Subscriber C       │              │
  (Telegram Bot) ◄──│              │
                     └──────────────┘

  Topik (Topics):
  ├── home/sensor/suhu     → Publisher 1 → Subscriber A
  ├── home/sensor/gas      → Publisher 2 → Subscriber B
  ├── home/sensor/#        → Subscriber C (wildcard)
  └── home/actuator/lampu  → Subscriber D → Aktuator

3.1 — MQTT Subscriber Sederhana

Subscriber adalah komponen yang mendengarkan pesan dari broker. Mari kita mulai dengan subscriber yang menerima pesan dari topic tertentu:

Python — mqtt_subscriber.py
"""
MQTT Subscriber - Menerima data dari topic IoT
IoTHub - https://iothub.id
"""

import paho.mqtt.client as mqtt

# --- Konfigurasi ---
BROKER_HOST = "localhost"    # Alamat MQTT broker
BROKER_PORT = 1883           # Port default MQTT
CLIENT_ID   = "subscriber-rpi"
TOPIC       = "home/sensor/suhu"


def on_connect(client, userdata, flags, rc, properties=None):
    """Callback saat koneksi ke broker berhasil."""
    if rc == 0:
        print(f"[OK] Terhubung ke MQTT broker ({BROKER_HOST}:{BROKER_PORT})")
        # Subscribe ke topik setelah koneksi berhasil
        client.subscribe(TOPIC, qos=1)
        print(f"[OK] Subscribe ke topik: {TOPIC}")
    else:
        print(f"[ERROR] Gagal koneksi, kode: {rc}")


def on_message(client, userdata, msg):
    """Callback saat menerima pesan dari topik yang di-subscribe."""
    payload = msg.payload.decode("utf-8")
    print(f"[DATA] {msg.topic} → {payload}")


def on_disconnect(client, userdata, rc, properties=None):
    """Callback saat koneksi terputus."""
    if rc != 0:
        print(f"[WARN] Koneksi terputus (rc={rc}), mencoba reconnect...")


# --- Setup Client ---
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)

# Set Last Will and Testament (LWT)
# Pesan ini dikirim broker jika client disconnect mendadak
client.will_set(
    topic="home/status/subscriber",
    payload="OFFLINE",
    qos=1,
    retain=True
)

# Hubungkan callback
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# Koneksi ke broker
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

# Mulai loop untuk menerima pesan secara terus-menerus
print("[INFO] Subscriber berjalan... Tekan Ctrl+C untuk berhenti.")
client.loop_forever()

3.2 — MQTT Publisher Sederhana

Publisher adalah komponen yang mengirim pesan ke broker. Berikut contoh publisher yang mengirim data suhu simulasi:

Python — mqtt_publisher.py
"""
MQTT Publisher - Mengirim data sensor ke broker
IoTHub - https://iothub.id
"""

import json
import time
import random
import paho.mqtt.client as mqtt

BROKER_HOST = "localhost"
BROKER_PORT = 1883
CLIENT_ID   = "publisher-sensor"
TOPIC       = "home/sensor/suhu"


def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        print(f"[OK] Publisher terhubung ke broker")
    else:
        print(f"[ERROR] Gagal koneksi, kode: {rc}")


client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)

# Aktifkan autentikasi jika broker membutuhkan
# client.username_pw_set("iot_user", "rahasia123")

# Set LWT untuk publisher
client.will_set("home/status/publisher", "OFFLINE", qos=1, retain=True)

client.on_connect = on_connect
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

# Publish status online (retain=True agar subscriber baru langsung tahu)
client.publish("home/status/publisher", "ONLINE", qos=1, retain=True)

# Loop publish data
try:
    print("[INFO] Mulai mengirim data...")
    while True:
        # Simulasi pembacaan sensor
        suhu = round(random.uniform(24.0, 35.0), 1)
        kelembaban = round(random.uniform(40.0, 80.0), 1)

        # Buat payload JSON
        payload = json.dumps({
            "suhu": suhu,
            "kelembaban": kelembaban,
            "unit": "celcius",
            "timestamp": int(time.time())
        })

        # Publish dengan QoS 1 (at least once delivery)
        result = client.publish(TOPIC, payload, qos=1)

        print(f"[KIRIM] {payload}")

        time.sleep(5)  # Kirim setiap 5 detik

except KeyboardInterrupt:
    print("\n[INFO] Publisher dihentikan.")
    client.publish("home/status/publisher", "OFFLINE", qos=1, retain=True)
    client.disconnect()
Output Subscriber: [OK] Terhubung ke MQTT broker (localhost:1883) [OK] Subscribe ke topik: home/sensor/suhu [INFO] Subscriber berjalan... Tekan Ctrl+C untuk berhenti. [DATA] home/sensor/suhu → {"suhu": 28.3, "kelembaban": 65.2, "unit": "celcius", "timestamp": 1750416000} [DATA] home/sensor/suhu → {"suhu": 29.1, "kelembaban": 63.8, "unit": "celcius", "timestamp": 1750416005}

3.3 — Memahami QoS (Quality of Service)

Level Nama Penjelasan Cocok untuk
QoS 0At Most OncePesan dikirim sekali tanpa konfirmasi. Bisa hilang.Data non-kritis,高频 update
QoS 1At Least OncePesan dijamin sampai, tapi mungkin duplikatSensor suhu, status aktuator
QoS 2Exactly OncePesan dijamin sampai tepat sekali. Paling lambat.Kritis: alarm, perintah aktuator
ℹ️ Last Will and Testament (LWT)

LWT adalah pesan khusus yang diatur saat client pertama kali connect ke broker. Jika client disconnect secara tidak normal (misalnya mati listrik), broker akan otomatis mempublikasikan LWT ke topik yang ditentukan. Ini sangat berguna untuk memantau status perangkat IoT secara real-time. Pastikan set retain=True agar subscriber baru langsung mengetahui status terakhir.

4. GPIO Control via Python

GPIO (General Purpose Input/Output) adalah pin digital pada Raspberry Pi yang bisa dikonfigurasi sebagai input atau output. Dengan Python, kita bisa mengontrol LED, relay, motor, dan membaca status tombol atau sensor digital.

4.1 — Menggunakan RPi.GPIO

Library RPi.GPIO adalah library klasik yang memberikan kontrol langsung ke pin GPIO Raspberry Pi:

Python — gpio_basic.py
"""
GPIO Control dengan RPi.GPIO
IoTHub - https://iothub.id
"""

import RPi.GPIO as GPIO
import time

# --- Setup Pin ---
LED_PIN    = 17   # GPIO17 (fisik pin 11)
BUTTON_PIN = 27   # GPIO27 (fisik pin 13)

# Gunakan penomoran BOARD atau BCM
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

# Setup pin LED sebagai OUTPUT
GPIO.setup(LED_PIN, GPIO.OUT)

# Setup pin Tombol sebagai INPUT dengan pull-up internal
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)

# --- Blink LED ---
print("[INFO] Blink LED pada GPIO17...")
for i in range(5):
    GPIO.output(LED_PIN, GPIO.HIGH)  # Nyalakan LED
    print(f"  Blink {i+1}: ON")
    time.sleep(0.5)
    GPIO.output(LED_PIN, GPIO.LOW)   # Matikan LED
    print(f"  Blink {i+1}: OFF")
    time.sleep(0.5)

# --- Baca Tombol ---
print("\n[INFO] Tunggu tekan tombol pada GPIO27...")
try:
    while True:
        # Tombol active-low (ditekan = LOW)
        if GPIO.input(BUTTON_PIN) == GPIO.LOW:
            GPIO.output(LED_PIN, GPIO.HIGH)
            print("[EVENT] Tombol ditekan! LED menyala.")
        else:
            GPIO.output(LED_PIN, GPIO.LOW)
        time.sleep(0.1)

except KeyboardInterrupt:
    print("\n[INFO] Program dihentikan.")

finally:
    GPIO.cleanup()  # Selalu bersihkan GPIO saat selesai!

4.2 — Menggunakan gpiozero (API Level Tinggi)

Library gpiozero menyediakan API yang jauh lebih bersih dan Pythonic. Sangat direkomendasikan untuk proyek baru:

Python — gpiozero_example.py
"""
GPIO Control dengan gpiozero (lebih Pythonic)
IoTHub - https://iothub.id
"""

from gpiozero import LED, Button, Buzzer
from signal import pause
import time

# --- Inisialisasi Komponen ---
led     = LED(17)          # LED pada GPIO17
button  = Button(27)       # Tombol pada GPIO27
buzzer  = Buzzer(22)       # Buzzer pada GPIO22

# --- Blink LED dengan method gpiozero ---
print("[INFO] Blink LED dengan gpiozero...")
led.blink(on_time=0.5, off_time=0.5, n=3, background=False)

# --- Event-driven: Tombol menyalakan LED ---
print("[INFO] Mode event-driven. Tekan tombol untuk LED ON.")

button.when_pressed = lambda: (led.on(), print("[EVENT] LED ON"))
button.when_released = lambda: (led.off(), print("[EVENT] LED OFF"))

# Jalankan event loop
pause()
💡 Perbandingan RPi.GPIO vs gpiozero

RPi.GPIO: Cocok jika Anda terbiasa dengan gaya pemrograman Arduino/low-level. Anda harus manual mengatur setup, input/output, dan cleanup. gpiozero: Lebih cocok untuk pemula karena menggunakan pendekatan object-oriented — cukup buat objek LED, panggil method-nya. Keduanya bisa dipakai bersamaan, tapi hindari mengontrol pin yang sama dari kedua library secara bersamaan.

Pin Mapping Raspberry Pi (BCM Mode)

Fungsi Pin BCM Pin Fisik Catatan
LED AGPIO 17Pin 11Cocok untuk output LED/relay
Tombol AGPIO 27Pin 13Input dengan pull-up internal
BuzzerGPIO 22Pin 15Output aktif HIGH
Sensor IRGPIO 23Pin 16Input digital dari sensor IR
SPI CE0GPIO 8Pin 24Pin SPI chip enable
I2C SDAGPIO 2Pin 3Data line I2C
I2C SCLGPIO 3Pin 5Clock line I2C

5. Membaca Data Sensor dengan Python

Salah satu kebutuhan utama dalam proyek IoT adalah membaca data dari sensor fisik. Pada bagian ini, kita akan mempelajari cara membaca sensor DHT11/DHT22 (suhu & kelembaban) dan BME280 (suhu, kelembaban & tekanan udara) menggunakan Python di Raspberry Pi.

5.1 — Membaca Sensor DHT11/DHT22

Python — sensor_dht.py
"""
Pembacaan Sensor DHT11/DHT22 dengan Python
IoTHub - https://iothub.id

Rangkaian:
  DHT11 Pin 1 (VCC) → 3.3V Raspberry Pi
  DHT11 Pin 2 (DATA) → GPIO4 (Pin 7) + Resistor 10kΩ ke 3.3V
  DHT11 Pin 4 (GND) → GND Raspberry Pi
"""

import time
import board
import adafruit_dht

# --- Inisialisasi Sensor ---
# Untuk DHT11 gunakan adafruit_dht.DHT11(board.D4)
# Untuk DHT22 gunakan adafruit_dht.DHT22(board.D4)
sensor = adafruit_dht.DHT11(board.D4)

# Interval pembacaan (detik)
interval = 5


def baca_sensor():
    """Membaca data dari sensor DHT dan mengembalikan dict."""
    try:
        suhu = sensor.temperature       # Celsius
        kelembaban = sensor.humidity     # Persen

        if suhu is None or kelembaban is None:
            return None

        return {
            "suhu": round(suhu, 1),
            "kelembaban": round(kelembaban, 1),
            "timestamp": int(time.time())
        }

    except RuntimeError as e:
        # Error umum DHT: CRC check failed
        print(f"[WARN] Sensor error: {e}")
        return None
    except Exception as e:
        print(f"[ERROR] Error tak terduga: {e}")
        return None


def main():
    print("=== Pembaca Sensor DHT11 ===")
    print(f"Membaca setiap {interval} detik...\n")

    try:
        while True:
            data = baca_sensor()

            if data:
                print(f"🌡️  Suhu       : {data['suhu']} °C")
                print(f"💧 Kelembaban : {data['kelembaban']} %")
                print(f"⏰ Timestamp  : {data['timestamp']}")
                print("---")
            else:
                print("[SKIP] Pembacaan gagal, coba lagi...\n")

            time.sleep(interval)

    except KeyboardInterrupt:
        print("\n[INFO] Program dihentikan.")
    finally:
        sensor.exit()


if __name__ == "__main__":
    main()
Output Terminal: === Pembaca Sensor DHT11 === Membaca setiap 5 detik... 🌡️ Suhu : 28.5 °C 💧 Kelembaban : 72.0 % ⏰ Timestamp : 1750416000 --- 🌡️ Suhu : 28.0 °C 💧 Kelembaban : 73.5 % ⏰ Timestamp : 1750416005 ---

5.2 — Membaca Sensor BME280 via I2C (smbus2)

Sensor BME280 menggunakan protokol I2C dan mengukur suhu, kelembaban, dan tekanan udara. Berikut cara membacanya langsung menggunakan library smbus2:

Python — sensor_bme280.py
"""
Pembacaan Sensor BME280 via I2C dengan smbus2
IoTHub - https://iothub.id

Rangkaian:
  BME280 VCC → 3.3V
  BME280 GND → GND
  BME280 SDA → GPIO2 (Pin 3)
  BME280 SCL → GPIO3 (Pin 5)
  BME280 SDO → GND (Alamat I2C: 0x76)
"""

import time
import struct
from smbus2 import SMBus

# Alamat default BME280 (jika SDO ke GND = 0x76, ke VCC = 0x77)
BME280_ADDR = 0x76

# Register BME280
REG_ID          = 0xD0
REG_RESET       = 0xE0
REG_CTRL_HUM    = 0xF2
REG_CTRL_MEAS   = 0xF4
REG_CONFIG      = 0xF5
REG_DATA_START  = 0xF7

# Kalibrasi data register
CALIB_26_41     = 0xE1
CALIB_00_25     = 0x88
CALIB_00_25_LEN = 26
CALIB_26_41_LEN = 7


class BME280:
    """Driver sederhana untuk sensor BME280."""

    def __init__(self, bus_number=1, address=BME280_ADDR):
        self.bus = SMBus(bus_number)
        self.addr = address

        # Verifikasi ID chip (harus 0x60)
        chip_id = self.bus.read_byte_data(self.addr, REG_ID)
        if chip_id != 0x60:
            raise RuntimeError(f"BME280 tidak terdeteksi! Chip ID: {chip_id:#x}")

        # Baca data kalibrasi
        self._load_calibration()

        # Konfigurasi: oversampling, mode normal
        self.bus.write_byte_data(self.addr, REG_CTRL_HUM, 0x05)    # Humidity x16
        self.bus.write_byte_data(self.addr, REG_CTRL_MEAS, 0xB7)   # Temp x16, Press x16, Normal mode
        self.bus.write_byte_data(self.addr, REG_CONFIG, 0xA0)       # Standby 1000ms

    def _load_calibration(self):
        """Baca register kalibrasi untuk kompensasi suhu & tekanan."""
        cal26 = self.bus.read_i2c_block_data(self.addr, CALIB_26_41, CALIB_26_41_LEN)
        cal00 = self.bus.read_i2c_block_data(self.addr, CALIB_00_25, CALIB_00_25_LEN)

        # Kompensasi suhu
        self.dig_T1 = struct.unpack(' 2047:
            self.dig_H4 -= 4096
        self.dig_H5 = (cal26[6] << 4) | ((cal26[5] >> 4) & 0x0F)
        if self.dig_H5 > 2047:
            self.dig_H5 -= 4096
        self.dig_H6 = struct.unpack('> 4)
        raw_temp  = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        raw_hum   = (data[6] << 8)  | data[7]
        return raw_temp, raw_press, raw_hum

    def read_compensated(self):
        """Baca dan kompensasi data suhu, tekanan, kelembaban."""
        raw_temp, raw_press, raw_hum = self.read_raw()

        # Kompensasi suhu (dari datasheet BME280)
        var1 = (raw_temp / 16384.0 - self.dig_T1 / 1024.0) * self.dig_T2
        var2 = ((raw_temp / 131072.0 - self.dig_T1 / 8192.0) ** 2) * self.dig_T3
        t_fine = var1 + var2
        suhu = t_fine / 5120.0

        # Kompensasi kelembaban
        h = t_fine - 76800.0
        h = (raw_hum - (self.dig_H4 * 64.0 + self.dig_H5 / 16384.0 * h)) * \
            (self.dig_H2 / 65536.0 * (1.0 + self.dig_H6 / 67108864.0 * h *
            (1.0 + self.dig_H3 / 67108864.0 * h)))
        kelembaban = max(0.0, min(h, 100.0))

        return round(suhu, 2), round(kelembaban, 2)


# --- Program Utama ---
if __name__ == "__main__":
    sensor = BME280()
    print("=== BME280 Sensor Reader ===\n")

    try:
        while True:
            suhu, kelembaban = sensor.read_compensated()
            print(f"🌡️  Suhu       : {suhu} °C")
            print(f"💧 Kelembaban : {kelembaban} %")
            print("---")
            time.sleep(3)

    except KeyboardInterrupt:
        print("\n[INFO] Selesai.")
ℹ️ Mengapa smbus2 langsung?

BME280 juga bisa dibaca menggunakan library adafruit-circuitpython-bme280 yang lebih sederhana. Namun, contoh di atas menunjukkan cara membaca register langsung menggunakan smbus2 agar Anda memahami cara kerja komunikasi I2C di level register. Untuk proyek produksi, gunakan library tingkat tinggi untuk kemudahan.

6. Scheduling & Cron Jobs

Sistem IoT yang andal membutuhkan penjadwalan tugas otomatis — misalnya mengirim laporan setiap jam, membersihkan log setiap tengah malam, atau menjalankan kalibrasi sensor secara berkala. Python menawarkan beberapa pilihan untuk ini.

6.1 — APScheduler: Penjadwalan di dalam Aplikasi Python

APScheduler adalah library scheduling paling fleksibel untuk Python. Berbeda dari cron, APScheduler berjalan di dalam proses Python dan mendukung tiga jenis scheduler: interval (periodik), date (sekali di waktu tertentu), dan cron (polanya mirip system cron).

Python — scheduler_example.py
"""
APScheduler - Penjadwalan Tugas Otomatis untuk IoT
IoTHub - https://iothub.id
"""

import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("iot-scheduler")


def baca_sensor_suhu():
    """Tugas periodik: Baca data suhu setiap 10 detik."""
    import random
    suhu = round(random.uniform(25.0, 32.0), 1)
    timestamp = datetime.now().strftime("%H:%M:%S")
    logger.info(f"[SENSOR] Suhu terbaca: {suhu} °C")


def kirim_laporan_harian():
    """Tugas cron: Kirim laporan setiap jam 08:00 pagi."""
    logger.info("[LAPORAN] Mengirim laporan harian ke server...")


def bersihkan_log():
    """Tugas cron: Bersihkan log file setiap tengah malam."""
    logger.info("[CLEANUP] Membersihkan log lama...")


def kalibrasi_sensor():
    """Tugas interval: Kalibrasi sensor setiap 6 jam."""
    logger.info("[KALIBRASI] Menjalankan kalibrasi sensor BME280...")


# --- Setup Scheduler ---
scheduler = BackgroundScheduler()

# Tugas 1: Baca sensor setiap 10 detik
scheduler.add_job(
    func=baca_sensor_suhu,
    trigger=IntervalTrigger(seconds=10),
    id="baca_sensor",
    name="Pembacaan Sensor Suhu",
    replace_existing=True
)

# Tugas 2: Laporan harian jam 08:00
scheduler.add_job(
    func=kirim_laporan_harian,
    trigger=CronTrigger(hour=8, minute=0),
    id="laporan_harian",
    name="Laporan Harian"
)

# Tugas 3: Bersihkan log tengah malam
scheduler.add_job(
    func=bersihkan_log,
    trigger=CronTrigger(hour=0, minute=0),
    id="bersihkan_log",
    name="Pembersihan Log"
)

# Tugas 4: Kalibrasi setiap 6 jam
scheduler.add_job(
    func=kalibrasi_sensor,
    trigger=IntervalTrigger(hours=6),
    id="kalibrasi",
    name="Kalibrasi Sensor"
)

# Mulai scheduler
scheduler.start()
logger.info("[OK] Scheduler dimulai dengan 4 tugas terjadwal.")

# Tampilkan jadwal yang aktif
for job in scheduler.get_jobs():
    logger.info(f"  📋 {job.name} → next run: {job.next_run_time}")

# Jalankan selama 30 detik untuk demo
try:
    logger.info("[INFO] Scheduler berjalan selama 30 detik...")
    time.sleep(30)
except KeyboardInterrupt:
    pass

scheduler.shutdown()
logger.info("[INFO] Scheduler dihentikan.")

6.2 — Perbandingan: APScheduler vs System Cron

Fitur APScheduler System Cron
Cara kerjaDi dalam proses PythonOS-level, menjalankan script baru
Interaksi antar tugas✅ Bisa berbagi state❌ Terisolasi
Misfire handling✅ Grace period & retry❌ Perlu penanganan manual
PersistenceVia SQLAlchemy/jobstore✅ Tersimpan di /etc/crontab
OverheadMinimal (in-process)Process spawn tiap eksekusi
Cocok untukApp Python yang sudah berjalanSchedule mandiri tanpa app
⚠️ Penting untuk Produksi

Saat menggunakan APScheduler di produksi, pastikan: (1) Gunakan BackgroundScheduler atau AsyncIOScheduler — jangan gunakan BlockingScheduler jika Anda juga perlu menjalankan MQTT client. (2) Aktifkan misfire_grace_time agar tugas tidak melewati deadline jika server sedang sibuk. (3) Gunakan jobstore berbasis database jika perlu persistensi setelah restart.

7. Proyek: Sistem Monitoring Otomatis

Sekarang kita akan menggabungkan semua konsep yang sudah dipelajari menjadi satu proyek nyata: Sistem Monitoring Suhu & Kelembaban Otomatis. Sistem ini akan membaca sensor, mengirim data ke MQTT broker, menjadwalkan pembacaan otomatis, dan mengontrol relay berdasarkan threshold.

Arsitektur Proyek

Diagram: Alur Sistem Monitoring Otomatis
┌────────────┐     ┌──────────────────────────────────┐     ┌───────────────┐
│   DHT11    ├────►│        RASPBERRY PI 4B           │     │  MQTT BROKER  │
│  (Sensor)  │     │                                  ├────►│  (Mosquitto)  │
└────────────┘     │  ┌─────────────────────────────┐ │     └───────┬───────┘
                   │  │    main_monitor.py           │ │             │
┌────────────┐     │  │                             │ │     ┌───────▼───────┐
│   LED      │◄────┤  │  1. Baca sensor (interval)  │ │     │  Dashboard /  │
│  (Status)  │     │  │  2. Kirim ke MQTT            │ │     │  Telegram Bot │
└────────────┘     │  │  3. Cek threshold            │ │     └───────────────┘
                   │  │  4. Kontrol relay             │ │
┌────────────┐     │  │  5. Log ke file               │ │
│   RELAY    │◄────┤  └─────────────────────────────┘ │
│  (Kipas/AC)│     └──────────────────────────────────┘
└────────────┘

Implementasi Lengkap

Python — main_monitor.py
"""
Sistem Monitoring Suhu & Kelembaban Otomatis
IoTHub - https://iothub.id

Fitur:
- Pembacaan DHT11 berkala via APScheduler
- Publish data ke MQTT broker (paho-mqtt)
- Kontrol relay otomatis berdasarkan threshold
- Logging ke file
- LED status: blink saat error, solid saat normal
"""

import json
import time
import logging
import random  # Ganti dengan baca sensor asli
from datetime import datetime

import paho.mqtt.client as mqtt
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger

# === KONFIGURASI ===
BROKER_HOST    = "localhost"
BROKER_PORT    = 1883
CLIENT_ID      = "monitor-rpi-01"
TOPIC_DATA     = "home/sensor/suhu"
TOPIC_STATUS   = "home/status/monitor"
TOPIC_RELAY    = "home/actuator/relay"

# Threshold untuk kontrol otomatis
SUHU_ON_THRESHOLD  = 30.0   # Suhu di atas ini → nyalakan kipas
SUHU_OFF_THRESHOLD = 27.0   # Suhu di bawah ini → matikan kipas

RELAY_PIN = 17   # GPIO pin relay
LED_PIN   = 27   # GPIO pin LED status
INTERVAL  = 10   # Detik antara pembacaan

# === LOGGING ===
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("monitor.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("monitor")

# === STATE GLOBAL ===
relay_aktif = False


def baca_sensor():
    """
    Baca data dari DHT11.
    Ganti bagian simulasi dengan kode sensor asli:
      import board, adafruit_dht
      sensor = adafruit_dht.DHT11(board.D4)
      suhu = sensor.temperature
      kelembaban = sensor.humidity
    """
    suhu = round(random.uniform(24.0, 34.0), 1)
    kelembaban = round(random.uniform(40.0, 80.0), 1)
    return suhu, kelembaban


def kontrol_relay(suhu):
    """Kontrol relay berdasarkan threshold suhu."""
    global relay_aktif

    if suhu >= SUHU_ON_THRESHOLD and not relay_aktif:
        relay_aktif = True
        logger.warning(f"⚡ RELAY ON — Suhu {suhu}°C ≥ {SUHU_ON_THRESHOLD}°C")
        # GPIO.output(RELAY_PIN, GPIO.HIGH)  # Aktifkan relay

    elif suhu <= SUHU_OFF_THRESHOLD and relay_aktif:
        relay_aktif = False
        logger.info(f"🔴 RELAY OFF — Suhu {suhu}°C ≤ {SUHU_OFF_THRESHOLD}°C")
        # GPIO.output(RELAY_PIN, GPIO.LOW)   # Matikan relay


def tugas_pembacaan():
    """Tugas utama: baca sensor → publish → kontrol relay."""
    try:
        suhu, kelembaban = baca_sensor()

        # Susun payload
        payload = json.dumps({
            "suhu": suhu,
            "kelembaban": kelembaban,
            "relay": "ON" if relay_aktif else "OFF",
            "device": CLIENT_ID,
            "timestamp": datetime.now().isoformat()
        })

        # Publish ke MQTT
        client.publish(TOPIC_DATA, payload, qos=1)
        logger.info(f"📊 Data terkirim: {payload}")

        # Kontrol relay otomatis
        kontrol_relay(suhu)

    except Exception as e:
        logger.error(f"❌ Error pembacaan: {e}")


# === MQTT SETUP ===
def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        logger.info(f"✅ MQTT terhubung ke {BROKER_HOST}")
        client.publish(TOPIC_STATUS, "ONLINE", qos=1, retain=True)

        # Subscribe ke topik kontrol relay dari dashboard
        client.subscribe(f"{TOPIC_RELAY}/command", qos=1)
        logger.info(f"📡 Subscribe ke {TOPIC_RELAY}/command")
    else:
        logger.error(f"❌ MQTT gagal koneksi, rc={rc}")


def on_message(client, userdata, msg):
    """Handler untuk perintah relay dari dashboard."""
    command = msg.payload.decode("utf-8")
    global relay_aktif

    if command == "ON":
        relay_aktif = True
        logger.info("🔔 Relay diaktifkan via MQTT command")
    elif command == "OFF":
        relay_aktif = False
        logger.info("🔔 Relay dimatikan via MQTT command")


def on_disconnect(client, userdata, rc, properties=None):
    if rc != 0:
        logger.warning(f"⚠️ MQTT terputus (rc={rc})")


# === INISIALISASI ===
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, CLIENT_ID)
client.will_set(TOPIC_STATUS, "OFFLINE", qos=1, retain=True)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# Hubungkan
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

# Mulai scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(
    func=tugas_pembacaan,
    trigger=IntervalTrigger(seconds=INTERVAL),
    id="pembacaan_sensor",
    name="Pembacaan Sensor & Publish MQTT"
)
scheduler.start()
logger.info(f"📅 Scheduler aktif — interval {INTERVAL} detik")

# === MAIN LOOP ===
logger.info("🚀 Sistem monitoring dimulai!")
logger.info(f"   Threshold ON  : {SUHU_ON_THRESHOLD}°C")
logger.info(f"   Threshold OFF : {SUHU_OFF_THRESHOLD}°C")

try:
    # Jalankan MQTT loop di thread utama
    client.loop_forever()

except KeyboardInterrupt:
    logger.info("\n🛑 Sistem dihentikan oleh user.")

finally:
    client.publish(TOPIC_STATUS, "OFFLINE", qos=1, retain=True)
    scheduler.shutdown()
    client.disconnect()
    logger.info("👋 Shutdown selesai.")
💡 Menjalankan di Background

Untuk menjalankan sistem monitoring secara permanen di Raspberry Pi, gunakan systemd service atau screen/tmux. Berikut contoh perintah screen: screen -S monitor → jalankan python3 main_monitor.py → tekan Ctrl+A lalu D untuk detach. Gunakan screen -r monitor untuk kembali.

8. Best Practices & Error Handling

Sistem IoT yang berjalan 24/7 membutuhkan error handling yang robust. Berikut praktik terbaik yang wajib diterapkan dalam proyek IoT Python Anda:

8.1 — Error Handling yang Benar

Python — error_handling.py
"""
Best Practices Error Handling untuk IoT Python
IoTHub - https://iothub.id
"""

import logging
import time
import json
from functools import wraps
from datetime import datetime

# === DECORATOR: RETRY dengan backoff ===
def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
    """Decorator untuk retry otomatis dengan exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    delay = min(base_delay * (2 ** (retries - 1)), max_delay)
                    logging.warning(
                        f"[RETRY] {func.__name__} gagal (attempt {retries}/{max_retries}): {e}"
                    )
                    if retries < max_retries:
                        logging.info(f"[RETRY] Menunggu {delay:.1f}s sebelum retry...")
                        time.sleep(delay)
                    else:
                        logging.error(f"[FATAL] {func.__name__} gagal setelah {max_retries} percobaan!")
                        raise
        return wrapper
    return decorator


# === CONTOH PENGGUNAAN ===
@retry_with_backoff(max_retries=3, base_delay=2.0)
def kirim_ke_broker(client, topic, payload):
    """Kirim data ke MQTT broker dengan retry."""
    result = client.publish(topic, json.dumps(payload), qos=1)
    if result.rc != 0:
        raise ConnectionError(f"Publish gagal, rc={result.rc}")
    return True


@retry_with_backoff(max_retries=5)
def baca_sensor_dht():
    """Baca sensor DHT dengan retry."""
    import board, adafruit_dht
    sensor = adafruit_dht.DHT11(board.D4)
    suhu = sensor.temperature
    if suhu is None:
        raise ValueError("Pembacaan sensor mengembalikan None")
    return suhu


# === HEALTH CHECK LOGGER ===
class HealthMonitor:
    """Monitor kesehatan sistem IoT."""

    def __init__(self):
        self.errors = []
        self.last_success = datetime.now()
        self.max_error_age = 300  # 5 menit

    def record_success(self):
        self.last_success = datetime.now()
        self.errors.clear()

    def record_error(self, error_msg):
        self.errors.append({
            "time": datetime.now().isoformat(),
            "error": error_msg
        })
        if len(self.errors) > 10:
            self.errors = self.errors[-10:]  # Simpan 10 error terakhir

    def is_healthy(self):
        """Cek apakah sistem masih sehat."""
        age = (datetime.now() - self.last_success).total_seconds()
        return age < self.max_error_age and len(self.errors) < 5

    def get_status_report(self):
        """Generate laporan kesehatan."""
        age = (datetime.now() - self.last_success).total_seconds()
        return {
            "healthy": self.is_healthy(),
            "last_success_ago": f"{age:.0f}s",
            "recent_errors": len(self.errors),
            "uptime_ok": age < self.max_error_age
        }


# === KONFIGURASI LOGGING YANG BAIK ===
def setup_logging(log_file="iot_system.log"):
    """Setup logging untuk sistem IoT."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger("iot")

Checklist Best Practices

Praktik Penjelasan Prioritas
SELALU pakai try/exceptWrap semua pembacaan sensor dan MQTT publish dalam try/except🔴 Tinggi
Retry dengan backoffJangan retry langsung — gunakan delay eksponensial🔴 Tinggi
Logging ke fileCatat semua aktivitas ke log file untuk debugging🔴 Tinggi
LWT MQTT aktifGunakan Last Will agar broker tahu jika device offline🟡 Sedang
Watchdog timerGunakan hardware/software watchdog untuk auto-restart🟡 Sedang
Konfigurasi via file/envJangan hardcode kredensial di kode sumber🟡 Sedang
Graceful shutdownPastikan relay aktuator dimatikan saat program berhenti🔴 Tinggi
Health check endpointExposes endpoint untuk monitoring dari luar🟢 Opsional
🚫 Kesalahan Umum yang Harus Dihindari

1. Hardcode password di kode sumber — gunakan environment variable atau file config terpisah. 2. Menggunakan time.sleep() panjang di main thread — gunakan scheduler atau threading agar MQTT loop tidak ter-block. 3. Tidak ada GPIO.cleanup() — pin GPIO bisa tertinggal di state yang salah setelah program berhenti. 4. Mengabaikan error NaN dari sensor — DHT11 sering mengembalikan None/NaN, pastikan selalu dicek.

9. Quiz: Uji Pemahamanmu!

Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut untuk menguji pemahamanmu tentang otomasi IoT dengan Python:

Pertanyaan 1: Level QoS mana yang menjamin pesan sampai minimal sekali (at least once), meski berpotensi duplikat?

a) QoS 0
b) QoS 1
c) QoS 2
d) QoS 3

Pertanyaan 2: Apa fungsi Last Will and Testament (LWT) dalam MQTT?

a) Menyimpan pesan ke dalam database broker
b) Mengenkripsi semua pesan yang dikirim client
c) Membuat client reconnect secara otomatis saat disconnect
d) Mengirim pesan otomatis dari broker jika client disconnect mendadak

Pertanyaan 3: Library apa yang disarankan untuk GPIO Raspberry Pi karena API-nya yang Pythonic dan object-oriented?

a) gpiozero
b) serial
c) numpy
d) flask

Pertanyaan 4: Trigger mana pada APScheduler yang menjalankan tugas secara periodik dengan interval tetap?

a) CronTrigger
b) DateTrigger
c) IntervalTrigger
d) OnceTrigger

Pertanyaan 5: Praktik terbaik apa yang harus digunakan saat retry koneksi MQTT yang gagal?

a) Retry langsung tanpa delay
b) Exponential backoff dengan delay yang meningkat
c) Cukup satu kali percobaan saja
d) Menutup program dan menjalankan ulang dari awal