본문 바로가기

라즈베리파이 피코 W를 사용한 MQTT 테스트(AWS IoT Core 사용)

by rudals.kim 2022. 12. 16. 댓글 개
반응형

AWS(Amazon Web Services)의 IoT Core 서비스를 사용하여 라즈베리파이 피코 W와 MQTT 테스트를 해 보았습니다.

아래는 Publish와 Subscribe의 topic 메시지를 사용하여 피코 W의 내장 LED를 on/off 동작 테스트 이미지입니다.

MQTT 테스트에 앞서 먼저 사전에 준비 작업이 필요합니다. 이 게시글에서는 MicroPython 코드를 사용하여 실제적으로 MQTT가 동작되는 부분만 설명되어 있으니 만약  아래 게시글에서 아직 설정하지 않은 부분이 있으시다면 먼저 읽어 보신 후 사전 설정을 해두시는 편이 좋을 것 같습니다. 

 

1. MicroPython 설치방법으로 아래 게시글 아래쪽의 기본 동작 테스트 부분을 참고하시면 됩니다.

 

라즈베리파이 피코 W 구매 및 기본 동작 테스트

라즈베리파이 피코를 사용하여 여러 가지를 기능 테스트와 관련 PCB를 제작하여 동작시켜 보았습니다. 라즈베리파이 피코 테스트 보드 제작 라즈베리파이 피코 보드와 다른 디바이스들을 연동

rudalskim.tistory.com

2. AWS IoT Core 서비스를 사용하기 위한 가입관련 게시글입니다.

 

AWS(Amazon Web Services) 가입하기

MQTT관련 클라우드 서비스를 검색하다가 AWS(Amazon Web Services)라는 것을 알게 되었습니다. AWS(Amazon Web Services)는 현재(2022년 1분기) 가장 많은 점유율을 가지고 있으며 가상으로 서비스를 생성하여 EC2

rudalskim.tistory.com

3. AWS IoT Core 서비스를 사용하여 MQTT 설정관련 게시글입니다.

 

MQTT사용을 위한 AWS(Amazon Web Services) IoT Core 설정하기

AWS(Amazon Web Services) IoT Core를 사용하여 MQTT 서비스를 이용하기 위해서는 아래와 같은 절차가 필요합니다. docs.aws.amazon.com의 개발자 안내서의 내용인데 자세하게 잘 설명이 되어 있습니다. AWS IoT 리

rudalskim.tistory.com

4. 아래 설명될 MicroPython 소스코드에서 사용될 AWS 인증서 변환 관련 게시글입니다.

 

OpenSSL을 사용한 AWS 인증서 포맷(PEM, DER) 변환하기

AWS(Amazon Web Services)의 IoT Core 서비스를 사용하여 MQTT를 테스트하게 되었는데 AWS에서 제공되는 인증서 파일들이 PEM 포맷으로 제공됩니다. AWS에서 인증서 다운로드 방법은 아래 게시글을 참고하시

rudalskim.tistory.com

MicroPython을 사용하여 MQTT 테스트를 하기 위해서는 umqtt.simple 라이브러리를 다운로드 받아야 합니다.

아래 공식 홈페이지를 참고하여 라이브러리 소스를 다운로드합니다.

 

micropython-umqtt.simple

Lightweight MQTT client for MicroPython.

pypi.org

다운로드를 한 압축파일을 해제하면 micropython-umqtt.simple-1.3.4/umptt/simple.py 파일이 있습니다.

Thonny를 실행 후 메뉴 -> View -> Files 메뉴를 선택한 후 위쪽 부분의  'This computer'에서 압축 해제된 umqtt 디렉터리로 이동합니다. 아랫부분에는 'Raspberry Pi Pico' 디렉터리가 보입니다. 위부분의 umqtt에서 마우스 오른쪽 버튼을 클릭하면 'Upload to /' 메뉴를 클릭하여 umqtt 라이브러리를 피코 W 로 복사해 넣습니다.

마찬가지 방법으로 AWS 인증서를 아래와 같은 디렉토리 구조로 피코 W에 복사해 넣습니다.(위쪽 4번 게시글 링크 참고)

MQTT를 사용하기 위한 사전작업을 마쳤습니다. 

실제 테스트는 아직 하지도 않았는데 MQTT 사용을 위한 사전 작업에 더 많은 시간을 들인것 같습니다.

 

사전 작업을 마쳤으니 이제 실제 AWS를 사용해 보도록 하겠습니다.

Python 소스코드를 단계별로 나눠 살펴보겠습니다.

아래 코드는 필요한 라이브러리와 관련 파라미터들을 설정하는 부분입니다.

from machine import Pin
import network
from umqtt.simple import MQTTClient
import time
from secrets import secrets

SSID = secrets['ssid']
PASS = secrets['password']

AWS_ENDPOINT = secrets['aws_endpoint']
CLIENT_ID = "aws_iot"
PUBLISH_CHANNEL=b'test/topic'
SUBSCRIBED_CHANNEL=b'test/topic'

counter=0
debounce_time=0

 

위 코드를 보면 secrets['변수명'] 인 부분이 있습니다.

이 부분은 WIFI 접속 이름, WIFI 암호, AWS 접속 주소 등 공개되지 않을 부분들을 모아 하나의 파일로 작성하였습니다.

형식은 아래와 같이 작성하여 피코 W의 루트 디렉터리에 secrets.py로 저장하면 됩니다.

secrets = {
   'ssid' : '와이파이 접속 이름',
   'password' : '와이파이 접속 암호',
   'aws_endpoint' : 'AWS 접속 주소'
}

 

aws_endpoint 주소는 AWS IoT의 설정에 적혀있습니다. 이 주소를 복사해 넣으면 됩니다.

다음은 인증서를 읽는 부분인데 피코 W의 루트 디렉터리의 certs 디렉터리에서 DER 포맷의 인증서를 읽어 들입니다.

def get_ssl_params():
    keyfile = '/certs/private.der'
    with open(keyfile, 'rb') as f:
        key = f.read()
    certfile = "/certs/certificate.der"
    with open(certfile, 'rb') as f:
        cert = f.read()    
    ssl_params = {'key': key,'cert': cert, 'server_side': False}
    return ssl_params

 

피코 W 보드에서 subscribe 메시지를 수신 시 동작되는 callback 함수로 'on', 'off' 메시지 수신시 내장 LED를 ON/OFF 시키며 이외의 메시지 수신시 출력 창에 보여줍니다.

def mqtt_callback(topic, msg):
    print("received data:")
    print("topic: %s message: %s" %(topic, msg))
    if topic==SUBSCRIBED_CHANNEL:
        led = Pin("LED", Pin.OUT)
        if msg==b'on':
            led.on()
        elif msg==b'off':
            led.off()
        else:
            print(msg)

 

라즈베리파이 피코 W에서 인터넷을 사용하기 위해 공유기에 연결 후 연결 확인하는 부분입니다.

def check_wifi(wlan):
    while not wlan.isconnected():
        time.sleep_ms(500)
        print(".")
        wlan.connect( SSID, PASS )
    if not wlan.isconnected():
        print("not connected")
    if wlan.isconnected():
        print("connected")

wlan = network.WLAN( network.STA_IF )
wlan.active( True )
check_wifi(wlan)

 

AWS endpoint에 연결 후 'test/topic' 토픽을 publish, subscribe 하는 부분입니다.

아래 예제는 매 5초마다 "Hello World! msg from picow #" 메시지를 publish 하는 코드입니다.

mqtt = MQTTClient( CLIENT_ID, AWS_ENDPOINT, port = 8883, keepalive = 10000, ssl = True, ssl_params = ssl_params )
mqtt.set_callback(mqtt_callback)
mqtt.connect()

mqtt.subscribe(SUBSCRIBED_CHANNEL)

print("Waiting for messages...")
while True:
    if ((time.ticks_ms()-debounce_time) > 5000):
        counter+=1
        debounce_time=time.ticks_ms()        
        msg_str = "Hello World! msg from picow {}".format(counter)
        mqtt.publish( topic = PUBLISH_CHANNEL, msg = msg_str , qos = 0 )
    mqtt.check_msg()

 

아래는 제가 AWS의 IoT Core 서비스를 사용하여 MQTT 테스트한 전체 소스 코드입니다.

from machine import Pin
import network
from umqtt.simple import MQTTClient
import time
from secrets import secrets

SSID = secrets['ssid']
PASS = secrets['password']

AWS_ENDPOINT = secrets['aws_endpoint']
CLIENT_ID = "aws_iot"
PUBLISH_CHANNEL=b'test/topic'
SUBSCRIBED_CHANNEL=b'test/topic'

counter=0
debounce_time=0

def get_ssl_params():
    keyfile = '/certs/private.der'
    with open(keyfile, 'rb') as f:
        key = f.read()
    certfile = "/certs/certificate.der"
    with open(certfile, 'rb') as f:
        cert = f.read()    
    ssl_params = {'key': key,'cert': cert, 'server_side': False}
    return ssl_params
    

def mqtt_callback(topic, msg):
    print("received data:")
    print("topic: %s message: %s" %(topic, msg))
    if topic==SUBSCRIBED_CHANNEL:
        led = Pin("LED", Pin.OUT)
        if msg==b'on':
            led.on()
        elif msg==b'off':
            led.off()
        else:
            print(msg)
  
def check_wifi(wlan):
    while not wlan.isconnected():
        time.sleep_ms(500)
        print(".")
        wlan.connect( SSID, PASS )
    if not wlan.isconnected():
        print("not connected")
    if wlan.isconnected():
        print("connected")

wlan = network.WLAN( network.STA_IF )
wlan.active( True )
check_wifi(wlan)
ssl_params = get_ssl_params()

mqtt = MQTTClient( CLIENT_ID, AWS_ENDPOINT, port = 8883, keepalive = 10000, ssl = True, ssl_params = ssl_params )
mqtt.set_callback(mqtt_callback)
mqtt.connect()

mqtt.subscribe(SUBSCRIBED_CHANNEL)

print("Waiting for messages...")
while True:
    if ((time.ticks_ms()-debounce_time) > 5000):
        counter+=1
        debounce_time=time.ticks_ms()        
        msg_str = "Hello World! msg from picow {}".format(counter)
        mqtt.publish( topic = PUBLISH_CHANNEL, msg = msg_str , qos = 0 )
    mqtt.check_msg()

 

Thonny에서 위 소스코드를 실행해 보면 Shell 창에 아래와 같은 로그가 나옵니다.

제 경우 Pulish와 Subscribe 모두 하나의 'test/topic' 토픽 이름으로 설정하여 피코 W에서 publish 된 메시지 역시 subscribe되고 있습니다.

AWS IoT의 테스트 -> MQTT 테스트 클라이언트에서 'test/topic'을 구독해 보면 아래와 같이 피코 W에서 보낸 메시지가 출력됩니다.

주제 게시란에 on을 적고 게시 버튼을 클릭하면 피코 W의 LED가 켜지고, off를 적고 게시하면 LED가 꺼집니다.

간단히 AWS IoT Core 서비스를 사용하여 MQTT 기본 테스트를 해 보았습니다.

이것을 잘 응용하여 온도센서로 온도를 읽어 들여 MQTT로 전송한다거나 홈 오토메이션으로 가정의 전등을 켜고 끈다거나 여러 가지로 응용해 보셔도 좋을 것 같습니다.

반응형

댓글