[作品提交] 【得捷电子Follow me第2期】番茄日历钟

ltpop   2023-11-15 01:33 楼主
  • WechatIMG9.jpg
  • WechatIMG8.jpg
  • WechatIMG10.jpg

回复评论 (5)

1 来自 2楼 ltpop 

一、介绍视频


 

二、项目总结

笔者是一个智能家居爱好者,玩过HomeAssistant、ESPHome、Tasmota等平台,并使用过ESP8266、ESP32自已DIY设备。通常我只使用现有平台进行配置,很少直接编写硬件代码。非常感谢DigiKey和EEWORLD举办的Follow me活动,让我有机会玩转智能设备的底层代码。

和大部分同学一样,我选择了CircuitPython作为开发平台和语言。社区资源非常丰富,开发门槛较低,调试方便,非常适合像我这样的新手。为了完成活动的要求并实现一个完整的功能,我选择了制作一个日历+天气+番茄钟功能。这个功能可以在日常的工作学习中,运用番茄工作法提高效率。

这是我第一次接触CircuitPython,完成项目的同时也学习了CircuitPython。我的方法是先确定项目目标,即需要实现的最小功能。然后针对每个功能参考官方示例并对相关库用法进行学习,并同时进行代码调试。各功能代码测试通过后,将其合并为最终完整的代码。

项目中的功能点较多,除了活动要求的WIFI配置、网络请求、中文显示、Neopixel LED控制外,还包含了屏幕控制、协程、触摸传感器等的使用。其中的难点是屏幕控制和协程。作为一个CircuitPython新手,目前完成的项目代码仅仅是实现了功能,仍有很大的优化空间。后面还会继续探索实践,欢迎大家一起交流。

项目目标:

  1. 完成活动要求
    1. 网络数据请求
    2. 中文显示
    3. Neopixel控制
  2. 实现番茄日历功能:
    1. 日期时间显示,并显示农历
    2. 天气显示
    3. 番茄钟控制功能
      1. 可设置番茄时长
      2. 显示番茄倒计时
      3. 完成后LED提醒并确认
      4. 番茄个数统计
  3. 实现方法
    1. WIFI连接至互联网
    2. 从NTP服务获取日期和时间
    3. 从开放日历API获取农历日期
    4. 从开放天气API获取当地天气
    5. 使用协程函数和共享变量的方法进行数据更新和屏幕显示控制
    6. 使用ESP32S3的触摸传感器进行输入控制,执行番茄钟的启动和确认
    7. 使用Neopixel LED的不同颜色和状态表示设备的状态,如联网状态、番茄钟运行、确认阶段等
  4. 项目成品展示
    1. 1.jpg

       

    2. 2.jpg

       

  5. 完成帖子:【得捷电子Follow me第2期】番茄日历钟 - DigiKey得捷技术专区 - 电子工程世界-论坛 (eeworld.com.cn)

活动任务

Follow me 第2期!与得捷电子一起解锁开发板超能力! (eeworld.com.cn)

任务1:控制屏幕显示中文

完成屏幕的控制,并且能显示中文

任务2:网络功能使用

完成网络功能的使用,能够创建热点和连接到WiFi

任务3:控制WS2812B

使用按键控制板载Neopixel LED的显示和颜色切换

任务4:分任务1:日历&时钟

完成一个可通过互联网更新的万年历时钟,并显示当地的天气信息

项目材料

ESP32S3 TFT Overview | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System

环境准备

安装CircuitPython

双击reset键,将.uf2文件复制到FTHRS2BOOT磁盘中,会自动重启并完成安装

使用Mu editor或VSCode+插件连接开发板,进入代码开发,两者都很方便进入REPL.模式进行调试。Mu连接更稳定,VSCode代码编写更强大。

开发流程一般是找相似的官方示例,在REPL模式中进行代码片断调试,测试成功后再写入code.py文件中,进行完整测试。

正常模型下修改了code.py文件内容后,开发板会自动重启载入代码。但如果在REPL模式下,需要手动重置开发板,除了按板子上的RST按钮外,还可以使用Ctrl+D或执行以下代码来重置。

import microcontroller;
microcontroller.reset();
 

安装Lib

代码中使用的库,一部分已经内置,可以直接import。否则就需要手动安装,官方提供的Lib链接为Libraries (circuitpython.org)Adafruit_CircuitPython_Bundle/libraries/drivers at main · adafruit/Adafruit_CircuitPython_Bundle (github.com),可进入下载与系统版本对应的包进行安装,如 adafruit-circuitpython-bundle-8.x-mpy-20231010.zip

安装方法很简单,解压后在lib目录中找到相应的.mpy文件,并拷贝至 CIRCUITPY 磁盘中lib目录下即可。如果需要源码,也可下载原始py文件。

功能点实现方法

中文字体显示(任务1)

官方示例:Introduction — Adafruit Bitmap_Font Library 1.0 documentation (circuitpython.org)

需要额外Lib:adafruit_display_text、adafruit_bitmap_font

显示中文的关键点在于找到中文字体,参考网友的代码使用了这里的开源字体:carrothu-cn/chinese-bitmap-fonts (github.com)。因为开发板空间限制,选择了大小合适的wenquanyi_13px.pcf,保存在 CIRCUITPY 磁盘的font目录下。

核心代码

import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font

display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)

group = displayio.Group()
# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)

display.root_group = group
 

在项目中的农历和天气信息均可正常显示中文内容,如下图(上中部和左下角位置)

1.jpg

连接WIF(任务2)

这个比较简单,无需额外的lib。但最好需要多次重试连接,避免上电时连接失败。

核心代码

import os
import wifi
import time

# 连接wifi
# 事先在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>

state = wifi.radio.connected
failure_count = 0
while not wifi.radio.connected:
    try:
        wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
    except OSError as error:
        print("Failed to connect, retrying\n", error)
        failure_count += 1
    time.sleep(5)
 

项目中在不同的联网状态下可进行不同的展示,如未联网时显示如下(左下角提醒未连WIFI)

4.jpg

控制WS2812B(任务3)

官方示例:Adafruit CircuitPython NeoPixel — Adafruit CircuitPython NeoPixel Library 1.0 documentation

需要额外Lib:neopixel、adafruit_led_animation

NeoPixel LED的使用方法非常灵活,官方有很多特效库,可完成各种效果。项目中使用不同的颜色指示当前开发板状态,如番茄钟计时显示为红色常量,番茄钟倒计时确认中显示为青蓝色闪烁等等。

核心代码

import board
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink

pixel_pin = board.NEOPIXEL
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)

# 红色常亮
pixels.fill((255, 0, 0))
pixels.show()

# 显示为青蓝色闪烁
blink = Blink(pixels, 0.5, color.CYAN)
blink.animate()
 

项目中在番茄确认时,LED灯为表蓝色闪烁,以便提醒用户进行番茄确认。

2.jpg

获取日期时间(任务4)

官方示例:Adafruit_Learning_System_Guides/Raspberry_Pi_Azure_IoT_Hub_Dashboard/qtPyEsp32S2_co2/code.py at 6e9b42bf65613e926cbcd5d2e3ddf19f7c4cbb54 · adafruit/Adafruit_Learning_System_Guides (github.com)

需要额外Lib:adafruit_ntp、adafruit_requests

因为开发板没有连接时钟模块,需要从NTP服务器获取时间,官方有现成的库可调用,为了稳定可使用国内的NTP服务器。NTP返回的数据中有日期和时间,为了方便获取农历日期,采用API的方式来获取,与天气API类似,需要注册账号使用API KEY来访问。

核心代码

import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_requests库
import adafruit_requests as requests

tz_offset=os.getenv("TIME_ZONE")
ntp = None
while True:
    # 使用前需要先联网
    # 获取当前ntp时间,
    if not ntp:
        pool = socketpool.SocketPool(wifi.radio)
        try:
            ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
        except OSError as error:
            print("NTP failed, retrying\n", error)
            pass

    # 获取阴历
    pool = socketpool.SocketPool(wifi.radio)
    https = requests.Session(pool, ssl.create_default_context())
    lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
    lunar_json = lunar_response.json()
    if lunar_json['code'] == 200:
        lunar_data = lunar_json['data']
        week_day = lunar_data['week_no']
        week_name = lunar_data['week_name']
        lunar_year_ganzhi = lunar_data['week_name']
        lunar_month_chinese = lunar_data['lunar_month_chinese']
        lunar_day_chinese = lunar_data['lunar_day_chinese']

        print(f"当前农历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
        print(f"当前星期:{week_name}")
    # 仅在第二天时再更新
    dt = ntp.datetime
    print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
    wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
    time.sleep(wait_seconds)

项目中屏幕顶部依次为日期、农历、时间显示,如下图

2.jpg

 

天气信息获取(任务4)

 

官方示例:EEWORLDLINKTK4,需要先注册账号,获得API KEY才能使用。

密码类信息可统一保存在根目录下的settings.toml文件中,格式为<变量名> = <值>,然后在代码中使用os.getenv("<变量名>")来获取,安全方便。

为了方便修改城市信息,也将天气城市设置在toml文件,或者通过当前网络的外网IP获取到当地城市,可访问相关API获取到,坐标可参考代码。

核心代码:

import os
import wifi
import time
import ssl
import socketpool
# 需要导入adafruit_requests库
import adafruit_requests as requests

# 获取天气信息
# 获取天气API KEY
weather_api_key = os.getenv("WEATHER_API_KEY")
# 获取天气城市:从配置文件中读取城市设置
weather_city = os.getenv("WEATHER_CITY")
while True:
    # 创建https
    pool = socketpool.SocketPool(wifi.radio)
    https = requests.Session(pool, ssl.create_default_context())
    # 如果读取不到配置的城市,则获取当前IP城市
    if not weather_city:
        # 获取当前外网IP和城市
        ip_city_response = https.get("https://myip.ipip.net/json")
        ip_city_json = ip_city_response.json()
        if ip_city_json["ret"] == "ok":
            weather_city = ip_city_json['data']['location'][2]
            print(f"当前IP城市:{weather_city}")

    # 当前天气
    weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
    weather_now_response = https.get(weather_now_url)
    weather_json = weather_now_response.json()
    if weather_json["results"]:
        now_weather = weather_json["results"][0]["now"]["text"]
        now_temperature = weather_json["results"][0]["now"]["temperature"]
    print(f"当前天气:{now_weather},气温:{now_temperature}℃")
    time.sleep(300) #5分钟更新一次
 

项目中天气和气温信息展示在屏幕底部,如下图

7.jpeg

屏幕显示控制

官网示例链接:Introduction | CircuitPython Display Support Using displayio | Adafruit Learning System

需要额外Lib:board、displayio、terminalio

屏幕控制分为两个部分,第一部分为画为屏幕的展示布局,可使用官方提供的displayio库来实现;第二部分为控制显示内容,基本思路就是在每次循环中更新各元素的展示内容,如文字、数字、图片等。

该部分涉及到项目中的wifi连接、网络时间、天气等处理,而且需要同步控制屏幕显示。代码行数较多,因为番倒计时每秒更新屏幕内容,目前的代码执行时屏幕显示不是很顺畅,偶尔会卡住1-2秒,代码仍需要优化。

屏幕布局中各组件的从属关系如下图所示,简单说就是Display包含Group,Group中包含Bitmap。

circuitpython_bitmap.png?1556576503

 

屏幕坐标从左上角开始

circuitpython_coord_sys.png?1555378384

 

bitmap_label中anchor_point为元素的参照位置点,值含义如下,如放置在屏幕左上角的元素可设置为(0,0),右下角的可设置为(1,1),具体可参考核心代码:

lcds___displays_text_bound.png?1614363536

 

核心代码:

import board
import displayio
import terminalio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font

display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)

group = displayio.Group()

# 设置日期显示(左上角)
date_label = bitmap_label.Label(terminalio.FONT, scale=1)
date_label.anchor_point = (0.0, 0.0)
date_label.anchored_position = (5, 5)
date_label.text = "2023-11-11"
group.append(date_label)


# 设置时间显示(右上角)
time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
time_label.anchor_point = (1.0, 0.0)
time_label.anchored_position = (display.width - 2, 2)
time_label.text = "11:30"
group.append(time_label)

# 设置农历显示(上中部)
lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
lunar_label.anchor_point = (0.5, 0.0)
lunar_label.anchored_position = (display.width // 2, 5)
lunar_label.text = "十月初一"
group.append(lunar_label)


# 设置天气显示(左下角)
weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
weather_label.anchor_point = (0.0, 1.0)
weather_label.anchored_position = (2, display.height - 5)
weather_label.text = "晴"
group.append(weather_label)

# 设置气温显示(下中部)
temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
temperature_label.anchor_point = (0.5, 1.0)
temperature_label.anchored_position = (display.width // 2, display.height - 5)
temperature_label.text = "0℃"
group.append(temperature_label)


# 设置番茄钟倒计时显示(中间)
pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
# 显示位置
pomodoro_label.anchor_point = (0.5, 0.5)
pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
pomodoro_label.text = "15:00"
group.append(pomodoro_label)

# 设置倒番茄钟统计显示(右下角)
count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
# 显示位置
count_label.anchor_point = (1, 1)
count_label.anchored_position = (display.width - 2, display.height - 2)
count_label.text = "0"
group.append(count_label)

os.getenv("POMODORO_TIMEOUT")

# 定义main_group
main_group = displayio.Group()
main_group.append(group)
# 只能有一个main_group
display.root_group = main_group

# 更新屏幕内容,以下为示例代码,需要环境变量,无法直接执行
while True:
    timeout = 0.1
    if internet.state and ntp_datetime.ntp:
        dt = ntp_datetime.ntp.datetime
        text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
        # 设置日期文本
        date_label.text = f"{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday}"
        # 设置时间文本
        time_label.text = f"{dt.tm_hour}:{dt.tm_min}"
        # 设置农历文本
        lunar_label.text = f"{lunar_month_chinese}月{lunar_day_chinese}"

    if weather.text:
        # 设置天气文本
        weather_label.text = f"{weather.text}"
        # 设置气温文本
        temperature_label.text = f"{weather.temperature}℃"

    else:
        text = f'请先连接WIFI'
        weather_label.text = text

    await asyncio.sleep(timeout)
 

效果图

7.jpeg

多任务并发控制

官方示例:Concurrent Tasks | Cooperative Multitasking in CircuitPython with asyncio | Adafruit Learning System

需要额外Lib:asyncio、adafruit_ticks

项目中需要控制不同的网络请求,并需要在屏幕展示不同的内容,在同一个循环中实现会非常复杂。需要使用并发控制将各事件进行解耦,方便代码编写。即Python中协程异步IO(asyncio),参考官方的文档,该实现可总结为以下几个步骤

  • 在普通函数前添加async关键字,将其转为协程函数
  • 使用await asyncio.sleep代替time.sleep进行延迟,释放对控制器的独占
  • 定义好各协程函数后,使用 asyncio.create_task(some_coroutine(arg1, arg2, ...))对其进行调用
  • 使用 await asyncio.gather(task1, task2, ...)等待各协程任务完成
  • 再次强调不要忘记使用 await

核心代码:这里是官方示例,实际代码可参考下面完整的项目源码

# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
#
# SPDX-License-Identifier: MIT

import asyncio
import board
import digitalio
import keypad


class Interval:
    """Simple class to hold an interval value. Use .value to to read or write."""

    def __init__(self, initial_interval):
        self.value = initial_interval


async def monitor_interval_buttons(pin_slower, pin_faster, interval):
    """Monitor two buttons: one lengthens the interval, the other shortens it.
    Change interval.value as appropriate.
    """
    # Assume buttons are active low.
    with keypad.Keys(
        (pin_slower, pin_faster), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                if key_event.key_number == 0:
                    # Lengthen the interval.
                    interval.value += 0.1
                else:
                    # Shorten the interval.
                    interval.value = max(0.1, interval.value - 0.1)
                print("interval is now", interval.value)
            # Let another task run.
            await asyncio.sleep(0)


async def blink(pin, interval):
    """Blink the given pin forever.
    The blinking rate is controlled by the supplied Interval object.
    """
    with digitalio.DigitalInOut(pin) as led:
        led.switch_to_output()
        while True:
            led.value = not led.value
            await asyncio.sleep(interval.value)


async def main():
    interval1 = Interval(0.5)
    interval2 = Interval(1.0)

    led1_task = asyncio.create_task(blink(board.D1, interval1))
    led2_task = asyncio.create_task(blink(board.D2, interval2))
    interval1_task = asyncio.create_task(
        monitor_interval_buttons(board.D3, board.D4, interval1)
    )
    interval2_task = asyncio.create_task(
        monitor_interval_buttons(board.D5, board.D6, interval2)
    )

    await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)


asyncio.run(main())

 

ESP32S3触摸传感器使用

官方示例:CircuitPython Cap Touch | CircuitPython Essentials | Adafruit Learning System

ESP32S3中内置了多个触摸传感器,可以无需外接设备即可实现输入控制,本项目中用于番茄钟的开始触发和确认功能。使用时可将支持触摸传感器的GPIO接一根导线或直接触摸PCB板。具体哪个GPIO支持触摸,可通过执行以下官方代码给出清单。

Capacitive Touch | Adafruit ESP32-S3 TFT Feather | Adafruit Learning System

执行结果为:

# 只显示支持触摸的GPIO
Touch on: A4
Touch on: A5
Touch on: TX
Touch on: D10
Touch on: D11
Touch on: D12
Touch on: LED
Touch on: RX
Touch on: D5
Touch on: D6
Touch on: D9

测试结果为11个,但经测试并不是每个都能使用,如其中的A4、A5一直都是触发状态,原因不明,有了解的小伙伴欢迎评论。项目中使用了D10,位于屏幕上侧中部,经使用测试,触摸反应很灵敏。

核心代码

import board
import touchio
import time

touch = touchio.TouchIn(board.D10)
while True:
    if touch.value:
        # 按钮开始番茄计时
        print("start")
    time.sleep(0.1)

多设备MQTT通信

本次活动一起下单的还有其他两块ESP32的单片机,用来模拟多设备通信。

本实验使用MQTT来实现多设备间通信,具体设备清单如下:

设备1:本次项目的主控,Adafruit ESP32-S3 TFT Feather,用来模拟中控面板,用来显示灯的开关状态。

设备2:另一款ESP32S3的开发板,AtomS3 Lite ESP32S3,用来模拟智能灯

设备3:ESP32C3模块,esp32-c3-wroom-02,经过简单的外围电路焊接,可用来模拟智能开关

效果展示:

设备1:显示灯的状态,右下角on和off

 

7.jpeg

 

8.jpeg

设备2:使用其背面的LED来模拟智能灯,如下图

9.jpeg

 

10.jpeg

 

设备3:模拟智能开关,模组添加了轻触按键,用来模拟开关。

简单说下模组的接线。因为这款是ESP32C3模组,无法直接使用,这里焊接了必要的外围和开关,通过USB2TTL模块接入,如下图。除了3V3、GND、TXD、RXD直接接入USB2TTL外,EN、IO2、IO8需要接高电平,这里各串联3个10K电阻接到3V3,按键开关两端接在IO9和GND,加电时按下可进行固件刷写,否则正常启动。(条件有限,仅用来做实验)

12.jpeg

 

11.jpeg

 

环境搭建

在本地搭建MQTT服务器,用的是mosquitto,流程比较简单,教程也比较多,不再赘述。

MQTT的设计如下:

定义了两个mqtt的topic:cmd/d2/led1和state/d2/led1,其中cmd为发送开灯命令,state为灯的状态更新。d2为设备,本实验中只涉及到设备2,定义为d2,led1为设备中的具体模块,本例中为led1。

其中设备1订阅state/d2/led1主题,当接收的消息为1时表示灯已开,为0时表示灯已关,对应屏幕上显示on和off。

设备2订阅cmd/d2/led1主题,当接收的消息为1时表示开灯操作(0为关机操作,2为切换操作),此时将本机的led1打开,并同时发送消息到主题state/d2/led1,消息内容为灯的具体状态0或1。同时也可通过设备本机的按键来进行开关灯操作,同样也发送相应的mqtt消息。

设备3监听本机按键操作,当按键按下时发送消息到主题cmd/d2/led1,消息内容为2,表示切换开灯操作。

核心代码

设备1和设备2都是ESP32S3芯片,直接使用CircuitPython环境;设备3为ESP32C3,使用了microPython环境。

设备1(中控端,负责显示灯状态)

# 此处为核心代码,完整代码见项目源码
import displayio
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    group = displayio.Group()

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

	# 其他代码省略,完整代码见项目源码
	
    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text

		# 其他代码省略,完整代码见项目源码
        await asyncio.sleep(pomodoro.wait)

设备2(智能灯)

import board
import os
import wifi
import socketpool
from digitalio import DigitalInOut, Direction, Pull
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 需要导入neopixel库
import neopixel
# 需要导入asyncio、adafruit_ticks库
import asyncio


# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.cmd_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        print(f"topic: {topic}")
        if topic == self.cmd_topic:
            print(f"message: {message}")
            if self.led:
                if message == "0":
                    self.led.power = False
                elif message == "1":
                    self.led.power = True
                elif message == "2":
                    self.led.power = not self.led.power


# LED
class LED:
    def __init__(self):
        self.pixels = None
        self.power = False
        self.wait = 0.05



# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)

# 按钮点击检测
async def monitor_buttons(led):
    button = DigitalInOut(board.BTN)
    while True:
        if not button.value:
            # 按钮按下,防抖
            # await asyncio.sleep(led.wait)
            # if not button.value:
            print("button pressed")
            led.power = not led.power
        await asyncio.sleep(led.wait)

# led控制
async def pixels_led(led, mqtt_client):
    if not led.pixels:
        # Neopixel LED定义
        pixel_pin = board.NEOPIXEL
        pixel_num = 1
        pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0, auto_write=True)
        pixels.fill((255, 255, 255))
        led.pixels = pixels

    last_power = None
    mqtt_client.led = led
    while True:
        if led.power:
            pixels.brightness = 1
        else:
            pixels.brightness = 0
        
        if last_power != led.power:
            print("led toggle")
            last_power = led.power
            print(mqtt_client)
            if mqtt_client.is_connected:
                print(mqtt_client.state_topic)  
                mqtt_client.mqtt.publish(mqtt_client.state_topic, 1 if led.power else 0)

        await asyncio.sleep(led.wait)

async def main():
    # 共享变量设置
    mqtt_client = MqttClient()
    led = LED()

    # 协程函数定义
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    monitor_buttons_task = asyncio.create_task(monitor_buttons(led))
    pixels_led_task = asyncio.create_task(pixels_led(led,mqtt_client))

    # 启动协程
    await asyncio.gather(mqtt_task, monitor_buttons_task, pixels_led_task)


asyncio.run(main())

设备3(智能开关)

import ujson
import network
from umqtt.simple import MQTTClient
from machine import Pin
from time import sleep_ms

# 保存 Wi-Fi MQTT 配置到配置文件
def save_wifi_config(ssid, password, mqtt_host, mqtt_port, mqtt_user, mqtt_password):
    config = {
        'wifi_ssid': ssid,
        'wifi_password': password,
        'mqtt_host': mqtt_host,
        'mqtt_port': mqtt_port,
        'mqtt_user': mqtt_user,
        'mqtt_password': mqtt_password
    }
    with open('config.json', 'w') as f:
        ujson.dump(config, f)

# 从配置文件中读取 Wi-Fi 配置
def load_wifi_config():
    try:
        with open('config.json', 'r') as f:
            config = ujson.load(f)
            return config['wifi_ssid'], config['wifi_password']
    except OSError:
        return None, None
    
# 从配置文件中读取 MQTT 配置
def load_mqtt_config():
    try:
        with open('config.json', 'r') as f:
            config = ujson.load(f)
            return config['mqtt_host'], config['mqtt_port'], config['mqtt_user'], config['mqtt_password']
    except OSError:
        return None, None, None, None

# 示例用法:保存 Wi-Fi 配置到配置文件
# wifi_ssid = 'your_wifi_ssid'
# wifi_password = 'your_wifi_password'
# mqtt_host = ''
# mqtt_port = ''
# mqtt_user = ''
# mqtt_password = ''

# save_wifi_config(wifi_ssid, wifi_password, mqtt_host, mqtt_port, mqtt_user, mqtt_password)

# 示例用法:从配置文件中读取 Wi-Fi 配置
# saved_wifi_ssid, saved_wifi_password = load_wifi_config()  


# 设置 Wi-Fi 连接
wifi_ssid, wifi_password = load_wifi_config()
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(wifi_ssid, wifi_password)

# 设置 MQTT 客户端
mqtt_broker, mqtt_port, mqtt_username, mqtt_password = load_mqtt_config()
mqtt_client_id = 'd2'
mqtt_topic = 'cmd/d1/led1'

mqtt_client = MQTTClient(mqtt_client_id, mqtt_broker, port=mqtt_port, user=mqtt_username, password=mqtt_password)
mqtt_client.connect()



# 设置按键引脚
button_pin = Pin(9, Pin.IN, Pin.PULL_UP)

# 按键防抖延迟时间(毫秒)
debounce_delay = 100


# 检测按键状态并处理按下事件
def handle_button_press():
    if not button_pin.value():
        sleep_ms(debounce_delay)
        if not button_pin.value():
            # 当按键按下时,发送 MQTT 消息
            mqtt_client.publish(mqtt_topic, '2')

while True:
    handle_button_press()

项目最终完整代码

终于完成了🧗‍♂️,完整代码如下

"""
番茄日历钟
ltpop@163.com
20231126
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT


# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True

# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 3600.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.ntp_sync = None
        self.next_lunar_request_time = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.2


# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset = os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            pool = socketpool.SocketPool(wifi.radio)
            # 获取当前ntp时间,
            if not ntp:
                try:
                    ntp = adafruit_ntp.NTP(
                        pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                    ntp_datetime.ntp_sync = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
                    
            lunar_need_update = False
            if not ntp_datetime.next_lunar_request_time:
                lunar_need_update = True
            elif time.time() > ntp_datetime.next_lunar_request_time:
                # 超过下一次请求时间
                lunar_need_update = True
                
            if lunar_need_update:
                # 获取阴历
                try:
                    https = requests.Session(pool, ssl.create_default_context())
                    lunar_response = https.get(
                        f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                except RuntimeError as error:
                    print("Lunar request failed, retrying\n", error)
                    lunar_response = None
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(
                            f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
                        # 设置下一次更新时间为第二天0点
                        current_time = time.localtime()
                        ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                # dt = time.localtime()
                # print(
                #     f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(ntp_datetime.wait)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                try:
                    ip_city_response = https.get("https://myip.ipip.net/json")
                except RuntimeError as error:
                    print("IP city request failed, retrying\n", error)    
                if ip_city_response:
                    ip_city_json = ip_city_response.json()
                    if ip_city_json["ret"] == "ok":
                        weather_city = ip_city_json['data']['location'][2]
                        print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            try:
                weather_now_response = https.get(weather_now_url)
            except RuntimeError as error:
                print("Weather request failed, retrying\n", error)
                weather_now_response = None
            if weather_now_response:
                weather_json = weather_now_response.json()
                if weather_json["results"]:
                    now_weather = weather_json["results"][0]["now"]["text"]
                    now_temperature = weather_json["results"][0]["now"]["temperature"]
                weather.text = now_weather
                weather.temperature = now_temperature
                print(f"当前天气:{now_weather},气温:{now_temperature}℃")

            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(
        pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    blink = Blink(pixels, 0.5, color.GREEN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为绿色闪烁
            if pomodoro.confirming:
                blink.speed = 0.5
                blink.color = color.GREEN
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        elif not internet.state:
            blink.speed = 2
            blink.color = color.RED
            blink.animate()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)

    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)

    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (
        display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(
        terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (
        display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)


    # 设置倒番茄钟统计显示(右下角)
    # with open("img/tomato.bmp", "rb") as f:
    #     image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
    #     sprite = displayio.TileGrid(image, pixel_shader=palette)
    #     group.append(sprite)

    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        if ntp_datetime.ntp_sync:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            if date_label.text != date_text:
                date_label.text = date_text
            # 设置时间文本
            time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            if time_label.text != time_text:
                time_label.text = time_text
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
                if lunar_label.text != lunar_text:
                    lunar_label.text = lunar_text

        if weather.text:
            # 设置天气文本
            if weather_label.text != weather.text:
                weather_label.text = weather.text
            
            # 设置气温文本
            temperature_text = f"{weather.temperature}℃"
            if temperature_label.text != temperature_text:
                temperature_label.text = temperature_text
        else:
            weather_text = f'请先连接WIFI'
            if weather_label.text != weather_text:
                weather_label.text = weather_text

        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_text = f"{pomodoro.count}"
        if count_label.text != count_text:
            count_label.text = count_text
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                pomodoro_text = f"{minute:02d}:{second:02d}"
                if pomodoro_label.text != pomodoro_text:
                    pomodoro_label.text = pomodoro_text
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        await asyncio.sleep(pomodoro.wait)


async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            await asyncio.sleep(0.1)
            if touch.value:
                # 按钮开始番茄计时
                if not pomodoro.run:
                    pomodoro.run = True
                    pomodoro.end = time.monotonic() + pomodoro.time
                # 番茄确认状态时,将番茄数加1
                elif pomodoro.confirming:
                    pomodoro.confirmed = True
                    pomodoro.count += 1
        await asyncio.sleep(0.1)


async def main():
    # 共享变量设置
    internet = Internet()
    mqtt_client = MqttClient()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
    lcd_display_task = asyncio.create_task(
        lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(
        monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

附settings.toml内容

CIRCUITPY_WIFI_SSID = "<根据实际情况修改>"
CIRCUITPY_WIFI_PASSWORD = "<根据实际情况修改>"
CIRCUITPY_WEB_API_PASSWORD = "passw0rd"	#未使用
CIRCUITPY_WEB_API_PORT = 80	#未使用

TIME_ZONE = 8

# https://www.alapi.cn/api/view/54
LUNAR_API_KEY = "<根据实际情况修改>"
WEATHER_API_KEY = "<根据实际情况修改>"
WEATHER_CITY = "<根据实际情况修改,可以设置为城市中文名或拼音>"

# 一个番茄钟时间秒数
POMODORO_TIME = 900
# 番茄钟完成时,手动确认超时秒数
POMODORO_TIMEOUT = 100

MQTT_HOST = "<根据实际情况修改>"
MQTT_PORT = 1883
MQTT_USER = "<根据实际情况修改>"
MQTT_PASSWORD = "<根据实际情况修改>"

MQTT_TOPIC = "d1"
MQTT_CLIENT_ID = "<根据实际情况修改>"

三、源码下载

源码打包:(非最新版本,更新后的代码版本可以参考帖子的内容)

ESP32S3-TFT-CircuitPython-番茄日历钟源码-嵌入式开发相关资料下载-EEWORLD下载中心

⛳️END

本帖最后由 ltpop 于 2023-12-28 16:46 编辑
点赞  2023-12-28 16:39

演示视频


 

项目源码

"""
番茄日历钟
ltpop@163.com
202311
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color

# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0


# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 300.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.1

# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)


    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')



# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset=os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            # 获取当前ntp时间,
            if not ntp:
                pool = socketpool.SocketPool(wifi.radio)
                try:
                    ntp = adafruit_ntp.NTP(pool, server = "cn.ntp.org.cn", tz_offset = tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
            else:
                # 获取阴历
                pool = socketpool.SocketPool(wifi.radio)
                https = requests.Session(pool, ssl.create_default_context())
                lunar_response = https.get(f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                dt = time.localtime()
                print(f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(wait_seconds)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                ip_city_response = https.get("https://myip.ipip.net/json")
                ip_city_json = ip_city_response.json()
                if ip_city_json["ret"] == "ok":
                    weather_city = ip_city_json['data']['location'][2]
                    print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            weather_now_response = https.get(weather_now_url)
            weather_json = weather_now_response.json()
            if weather_json["results"]:
                now_weather = weather_json["results"][0]["now"]["text"]
                now_temperature = weather_json["results"][0]["now"]["temperature"]
            weather.text = now_weather
            weather.temperature = now_temperature
            print(f"当前天气:{now_weather},气温:{now_temperature}℃")


            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    # 番茄等待确认中,显示为青蓝色闪烁
    blink = Blink(pixels, 0.5, color.CYAN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为
            if pomodoro.confirming:
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)


    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)


    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)


    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)

    # 设置倒番茄钟统计显示(右下角)
    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group


    while True:
        if internet.state:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_label.text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            # 设置时间文本
            time_label.text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_label.text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"

        if weather.text:
            # 设置天气文本
            weather_label.text = f"{weather.text}"
            # 设置气温文本
            temperature_label.text = f"{weather.temperature}℃"
        else:
            weather_label.text = f'请先连接WIFI'

        timeout = pomodoro.wait
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_label.text = f"{pomodoro.count}"
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                sec = left_seconds % 1
                timeout = 1.0 - sec
                pomodoro_label.text = f"{minute:02d}:{second:02d}"
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        else:
            timeout = pomodoro.wait

        await asyncio.sleep(timeout)



async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            # 按钮开始番茄计时
            if not pomodoro.run:
                pomodoro.run = True
                pomodoro.end = time.monotonic() + pomodoro.time
            # 番茄确认状态时,将番茄数加1
            elif pomodoro.confirming:
                pomodoro.confirmed = True
                pomodoro.count += 1
        await asyncio.sleep(pomodoro.wait)


async def main():
    # 共享变量设置
    internet = Internet()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet,weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet,pomodoro))
    lcd_display_task = asyncio.create_task(lcd_display(internet, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

 

本帖最后由 ltpop 于 2023-11-17 22:49 编辑
点赞  2023-11-15 15:19
帖子使用了markdown格式,不知道什么原因,代码缩进没有了,但在编辑模式下是正常的。
点赞  2023-11-17 22:40

楼主辛苦了,感谢楼主提供的这么好技术分享,顶起来

点赞  2023-11-18 21:23

最终版本代码(有正确缩进)

"""
番茄日历钟
ltpop@163.com
202311 v26
"""

# TOML配置文件读取
import os
import time
import rtc
import board
import displayio
import terminalio
import touchio
import wifi
import ssl
import socketpool

# 需要导入asyncio、adafruit_ticks库
import asyncio
# 需要导入neopixel库
import neopixel
# 需要导入adafruit_ntp库
import adafruit_ntp
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_imageload库
import adafruit_imageload
# 需要导入adafruit_requests库
import adafruit_requests as requests
# 需要导入adafruit_led_animation库
from adafruit_led_animation.animation.blink import Blink
from adafruit_led_animation.animation.rainbow import Rainbow
from adafruit_led_animation.sequence import AnimationSequence
import adafruit_led_animation.color as color
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT


# 当前设备联网状态
class Internet:
    def __init__(self):
        self.state = False
        self.wait = 30.0

# MQTT
class MqttClient:
    def __init__(self):
        self.mqtt = None
        self.is_connected = False

        self.host = os.getenv("MQTT_HOST")
        self.port = os.getenv("MQTT_PORT")
        self.user = os.getenv("MQTT_USER")
        self.password = os.getenv("MQTT_PASSWORD")

        self.client_id =  os.getenv("MQTT_CLIENT_ID")
        self.topic =  os.getenv("MQTT_TOPIC")
        self.led = "led1"
        self.cmd_topic = f"cmd/{self.topic}/{self.led}"
        self.state_topic = f"stat/{self.topic}/{self.led}"

        self.led = None
        self.wait = 0.1

    def connect(self, mqtt_cli, userdata, flags, rc):
        print(f"Connected to MQTT {self.host}")
        self.is_connected = True
        mqtt_cli.subscribe(self.state_topic)

    def disconnect(self, mqtt_cli, userdata, rc):
        print(f"Disconnected from MQTT {self.host}")
        self.is_connected = False

    def message(self, client, topic, message):
        print(f"New message on topic {topic}: {message}")
        # 0-关,1-关,2-切换
        if topic == self.state_topic:
            if message == '0':
                self.led = False
            elif message == '1':
                self.led = True

# 当前天气状态
class Weather:
    def __init__(self):
        self.city = ''
        self.text = None
        self.temperature = 20.0
        self.wait = 3600.0


# 当前日期时间状态
class NTPDatetime:
    def __init__(self):
        self.datetime = None
        self.ntp = None
        self.ntp_sync = None
        self.next_lunar_request_time = None
        self.weekday = None
        self.week_name = None
        self.lunar_year_ganzhi = None
        self.lunar_month_chinese = None
        self.lunar_day_chinese = None
        self.wait = 3600.0
        self.retry = 10


# 番茄统计
class Pomodoro:
    def __init__(self):
        self.count = 0
        self.run = False
        self.end = 0
        self.confirming = False
        self.confirmed = False
        self.time = os.getenv("POMODORO_TIME")
        self.timeout = os.getenv("POMODORO_TIMEOUT")
        self.wait = 0.2


# 连接wifi
async def wifi_connect(internet):
    # 事件在settings.toml配置 WIFI信息:CIRCUITPY_WIFI_SSID=<wifi ssid名称>和CIRCUITPY_WIFI_PASSWORD=<wifi密码>
    failure_count = 0
    while True:
        internet.state = wifi.radio.connected
        if not wifi.radio.connected:
            try:
                wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv(
                    "CIRCUITPY_WIFI_PASSWORD"))
            except OSError as error:
                print("Failed to connect, retrying\n", error)
                failure_count += 1
        await asyncio.sleep(internet.wait)

    # 如果wifi没有正常连接,则切换为ap模式
    # if not wifi_state:
    #     wifi.radio.start_ap(ssid = 'esp32s3_ap', password = 'abcd1234')


async def mqtt_connect(mqtt_client):
    while True:
        if not mqtt_client.mqtt:
            print("Set up a MiniMQTT Client")
            pool = socketpool.SocketPool(wifi.radio)
            mqtt = MQTT.MQTT(
                broker=mqtt_client.host,
                username=mqtt_client.user,
                password=mqtt_client.password,
                socket_pool=pool,
                client_id=mqtt_client.client_id
            )

            mqtt.on_connect = mqtt_client.connect
            mqtt.on_disconnect = mqtt_client.disconnect
            mqtt.on_message = mqtt_client.message
            mqtt_client.mqtt = mqtt
            mqtt.connect()

        try:
            mqtt_client.mqtt.loop()
        except (ValueError, RuntimeError) as e:
            print("Failed to get data, retrying\n", e)
            mqtt_client.mqtt.reconnect()
            continue

        await asyncio.sleep(mqtt_client.wait)


# 获取时间
async def fetch_time(internet, ntp_datetime):
    tz_offset = os.getenv("TIME_ZONE")
    the_rtc = rtc.RTC()
    ntp = None
    while True:
        if internet.state:
            ntp_ok = False
            lunar_ok = False
            pool = socketpool.SocketPool(wifi.radio)
            # 获取当前ntp时间,
            if not ntp:
                try:
                    ntp = adafruit_ntp.NTP(
                        pool, server="ntp.ntsc.ac.cn", tz_offset=tz_offset)
                    # 更新系统时间
                    the_rtc.datetime = ntp.datetime
                    ntp_datetime.ntp = ntp
                    ntp_ok = True
                    ntp_datetime.ntp_sync = True
                except OSError as error:
                    print("NTP failed, retrying\n", error)
                    ntp = None
                    
            lunar_need_update = False
            if not ntp_datetime.next_lunar_request_time:
                lunar_need_update = True
            elif time.time() > ntp_datetime.next_lunar_request_time:
                # 超过下一次请求时间
                lunar_need_update = True
                
            if lunar_need_update:
                # 获取阴历
                try:
                    https = requests.Session(pool, ssl.create_default_context())
                    lunar_response = https.get(
                        f'https://v2.alapi.cn/api/lunar?token={os.getenv("LUNAR_API_KEY")}')
                except RuntimeError as error:
                    print("Lunar request failed, retrying\n", error)
                    lunar_response = None
                if lunar_response:
                    lunar_json = lunar_response.json()
                    if lunar_json['code'] == 200:
                        lunar_data = lunar_json['data']
                        # print(lunar_data)
                        week_day = lunar_data['week_no']
                        if week_day:
                            ntp_datetime.week_day = week_day

                        week_name = lunar_data['week_name']
                        if week_name:
                            ntp_datetime.week_name = week_name

                        lunar_year_ganzhi = lunar_data['ganzhi_year']
                        if lunar_year_ganzhi:
                            ntp_datetime.lunar_year_ganzhi = lunar_year_ganzhi

                        lunar_month_chinese = lunar_data['lunar_month_chinese']
                        if lunar_month_chinese:
                            ntp_datetime.lunar_month_chinese = lunar_month_chinese

                        lunar_day_chinese = lunar_data['lunar_day_chinese']
                        if lunar_day_chinese:
                            ntp_datetime.lunar_day_chinese = lunar_day_chinese
                        lunar_ok = True
                        print(
                            f"当前阴历:{lunar_year_ganzhi}年 {lunar_month_chinese}{lunar_day_chinese}")
                        print(f"当前星期:{week_name}")
                        # 设置下一次更新时间为第二天0点
                        current_time = time.localtime()
                        ntp_datetime.next_lunar_request_time = time.mktime((current_time.tm_year, current_time.tm_mon, current_time.tm_mday + 1, 0, 0, 0, 0, 0, 0))
            # 仅在第二天时再更新
            if not ntp_ok or not lunar_ok:
                await asyncio.sleep(ntp_datetime.retry)
            else:
                # dt = time.localtime()
                # print(
                #     f"当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour:02d}:{dt.tm_min:02d}")
                # wait_seconds = (24-dt.tm_hour)*60+(60 - dt.tm_sec)
                await asyncio.sleep(ntp_datetime.wait)
        else:
            await asyncio.sleep(internet.wait)

# 获取天气信息
async def fetch_weather(internet, weather):
    # 获取天气API KEY
    weather_api_key = os.getenv("WEATHER_API_KEY")
    # 获取天气城市:从配置文件中读取城市设置
    weather_city = os.getenv("WEATHER_CITY")
    while True:
        if internet.state:
            # 天气信息
            pool = socketpool.SocketPool(wifi.radio)
            https = requests.Session(pool, ssl.create_default_context())
            # 如果读取不到配置的城市,则获取当前IP城市
            if not weather_city:
                # 获取当前外网IP和城市
                try:
                    ip_city_response = https.get("https://myip.ipip.net/json")
                except RuntimeError as error:
                    print("IP city request failed, retrying\n", error)    
                if ip_city_response:
                    ip_city_json = ip_city_response.json()
                    if ip_city_json["ret"] == "ok":
                        weather_city = ip_city_json['data']['location'][2]
                        print(f"当前IP城市:{weather_city}")
            weather.city = weather_city

            # 当前天气
            weather_now_url = f"https://api.seniverse.com/v3/weather/now.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            try:
                weather_now_response = https.get(weather_now_url)
            except RuntimeError as error:
                print("Weather request failed, retrying\n", error)
                weather_now_response = None
            if weather_now_response:
                weather_json = weather_now_response.json()
                if weather_json["results"]:
                    now_weather = weather_json["results"][0]["now"]["text"]
                    now_temperature = weather_json["results"][0]["now"]["temperature"]
                weather.text = now_weather
                weather.temperature = now_temperature
                print(f"当前天气:{now_weather},气温:{now_temperature}℃")

            # 未来天气预报
            # weather_daily_url = f"https://api.seniverse.com/v3/weather/daily.json?key={weather_api_key}&location={weather_city}&language=zh-Hans&unit=c&start=-1&days=5"
            # weather_response = https.get(weather_daily_url)
            # weather_json = weather_response.json()
            # if weather_json["results"]:
            #     today_weather = weather_json["results"][0]["daily"][0]["text_day"]
            #     today_temprature_low = weather_json["results"][0]["daily"][0]["low"]
            #     today_temprature_high = weather_json["results"][0]["daily"][0]["high"]
            #     today_humidity = weather_json["results"][0]["daily"][0]["humidity"]

            # print(f"明天天气:{today_weather},气温:{today_temprature_low}℃ - {today_temprature_high}℃,温度:{today_humidity}%")

            await asyncio.sleep(weather.wait)
        else:
            await asyncio.sleep(internet.wait)

# led显示
async def pixels_led(internet, pomodoro):
    # Neopixel LED控制
    pixel_pin = board.NEOPIXEL
    pixel_num = 1
    pixels = neopixel.NeoPixel(
        pixel_pin, pixel_num, brightness=0.05, auto_write=False)
    rainbow = Rainbow(pixels, speed=0.1, period=2)
    blink = Blink(pixels, 0.5, color.GREEN)
    animations = AnimationSequence(
        rainbow,
        advance_interval=5,
        auto_clear=True,
    )

    while True:
        # 番茄进行中,显示为红色常亮
        if pomodoro.run:
            # 番茄等待确认中,显示为绿色闪烁
            if pomodoro.confirming:
                blink.speed = 0.5
                blink.color = color.GREEN
                blink.animate()
            else:
                pixels.fill((255, 0, 0))
                pixels.show()
        elif not internet.state:
            blink.speed = 2
            blink.color = color.RED
            blink.animate()
        else:
            # 否则显示为彩虹色
            animations.animate()
        await asyncio.sleep(pomodoro.wait)


# 屏幕显示
async def lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro):

    display = board.DISPLAY
    # 中文字体文件放在font目录下
    font_file = "font/wenquanyi_13px.pcf"
    font = bitmap_font.load_font(font_file)

    group = displayio.Group()

    # 设置日期显示(左上角)
    date_label = bitmap_label.Label(terminalio.FONT, scale=1)
    date_label.anchor_point = (0.0, 0.0)
    date_label.anchored_position = (5, 5)
    date_label.text = "2023-11-11"
    group.append(date_label)

    # 设置时间显示(右上角)
    time_label = bitmap_label.Label(terminalio.FONT, color=0xFF0000, scale=2)
    time_label.anchor_point = (1.0, 0.0)
    time_label.anchored_position = (display.width - 2, 2)
    time_label.text = "11:30"
    group.append(time_label)

    # 设置农历显示(上中部)
    lunar_label = bitmap_label.Label(font, color=0x0000FF, scale=1)
    lunar_label.anchor_point = (0.5, 0.0)
    lunar_label.anchored_position = (display.width // 2, 5)
    lunar_label.text = "九月廿八"
    group.append(lunar_label)

    # 设置天气显示(左下角)
    weather_label = bitmap_label.Label(font, color=0x00FF00, scale=1)
    weather_label.anchor_point = (0.0, 1.0)
    weather_label.anchored_position = (2, display.height - 5)
    weather_label.text = "晴"
    group.append(weather_label)

    # 设置气温显示(下中部)
    temperature_label = bitmap_label.Label(font, color=0xFFFF00, scale=1)
    temperature_label.anchor_point = (0.5, 1.0)
    temperature_label.anchored_position = (
        display.width // 2, display.height - 5)
    temperature_label.text = "5℃"
    group.append(temperature_label)

    # 设置MQTT状态(下中右部)
    mqtt_label = bitmap_label.Label(terminalio.FONT, scale=1)
    mqtt_label.anchor_point = (0.5, 1.0)
    mqtt_label.anchored_position = (display.width // 1.3, display.height - 5)
    mqtt_label.text = "-"
    group.append(mqtt_label)

    # 设置番茄钟倒计时显示(中间)
    pomodoro_label = bitmap_label.Label(
        terminalio.FONT, color=0xFF00FF, scale=7)
    # 显示位置
    pomodoro_label.anchor_point = (0.5, 0.5)
    pomodoro_label.anchored_position = (
        display.width // 2, display.height // 2)
    pomodoro_label.text = "15:00"
    group.append(pomodoro_label)


    # 设置倒番茄钟统计显示(右下角)
    # with open("img/tomato.bmp", "rb") as f:
    #     image, palette = adafruit_imageload.load(f, bitmap=displayio.Bitmap, palette=displayio.Palette)
    #     sprite = displayio.TileGrid(image, pixel_shader=palette)
    #     group.append(sprite)

    count_label = bitmap_label.Label(terminalio.FONT, color=0x00FFFF, scale=2)
    # 显示位置
    count_label.anchor_point = (1, 1)
    count_label.anchored_position = (display.width - 2, display.height - 2)
    count_label.text = "0"
    group.append(count_label)

    # 番茄倒计时结束时的确认超时时间,超时不确认该番茄不计入统计
    os.getenv("POMODORO_TIMEOUT")

    # 创建根group
    main_group = displayio.Group()
    main_group.append(group)
    # 展示
    display.root_group = main_group

    while True:
        if ntp_datetime.ntp_sync:
            dt = time.localtime()
            # text = f'当前时间:{dt.tm_year}-{dt.tm_mon}-{dt.tm_mday} {dt.tm_hour}:{dt.tm_min} ,当前城市:{weather.city}, 当前天气:{weather.text} ,温度:{weather.temperature}℃'
            # 设置日期文本
            date_text = f"{dt.tm_year:04d}-{dt.tm_mon:02d}-{dt.tm_mday:02d}"
            if date_label.text != date_text:
                date_label.text = date_text
            # 设置时间文本
            time_text = f"{dt.tm_hour:02d}:{dt.tm_min:02d}"
            if time_label.text != time_text:
                time_label.text = time_text
            # 设置农历文本
            if ntp_datetime.lunar_month_chinese and ntp_datetime.lunar_day_chinese:
                lunar_text = f"{ntp_datetime.lunar_month_chinese}{ntp_datetime.lunar_day_chinese}"
                if lunar_label.text != lunar_text:
                    lunar_label.text = lunar_text

        if weather.text:
            # 设置天气文本
            if weather_label.text != weather.text:
                weather_label.text = weather.text
            
            # 设置气温文本
            temperature_text = f"{weather.temperature}℃"
            if temperature_label.text != temperature_text:
                temperature_label.text = temperature_text
        else:
            weather_text = f'请先连接WIFI'
            if weather_label.text != weather_text:
                weather_label.text = weather_text

        # MQTT LED状态
        if mqtt_client.mqtt.is_connected:
            if mqtt_client.led:
                # 亮灯显示为红色
                led_text =  "on"
                mqtt_label.color=0xFF0000
            else:
                led_text =  "off"
                mqtt_label.color=0xFFFFFF

            if mqtt_label.text != led_text:
                mqtt_label.text = led_text
        # 更新番茄钟
        pomodoro_label.color = 0x00FFFF
        count_text = f"{pomodoro.count}"
        if count_label.text != count_text:
            count_label.text = count_text
        if pomodoro.run:
            left_seconds = pomodoro.end - time.monotonic()
            if left_seconds >= 0:
                minute = int(left_seconds / 60)
                second = int(left_seconds % 60)
                # 倒计时每秒更新一次
                pomodoro_text = f"{minute:02d}:{second:02d}"
                if pomodoro_label.text != pomodoro_text:
                    pomodoro_label.text = pomodoro_text
            else:
                # 番茄完成时,需要在超时时间内按键确认方可统计为完成的番茄数
                timeout_seconds = abs(left_seconds)
                if not pomodoro.confirmed and timeout_seconds < pomodoro.timeout:
                    pomodoro.confirming = True
                    weather_label.text = f'番茄等待确认'
                    # 超时时显示为红色
                    pomodoro_label.color = 0xFF0000
                    pomodoro_label.text = f"{int(pomodoro.timeout - timeout_seconds):02d}"
                else:
                    pomodoro_label.text = f'{int(pomodoro.time/60):02d}:{int(pomodoro.time%60):02d}'
                    pomodoro.confirming = False
                    pomodoro.confirmed = False
                    pomodoro.run = False

        await asyncio.sleep(pomodoro.wait)


async def monitor_touch_buttons(pomodoro):
    touch = touchio.TouchIn(board.D10)
    while True:
        if touch.value:
            await asyncio.sleep(0.1)
            if touch.value:
                # 按钮开始番茄计时
                if not pomodoro.run:
                    pomodoro.run = True
                    pomodoro.end = time.monotonic() + pomodoro.time
                # 番茄确认状态时,将番茄数加1
                elif pomodoro.confirming:
                    pomodoro.confirmed = True
                    pomodoro.count += 1
        await asyncio.sleep(0.1)


async def main():
    # 共享变量设置
    internet = Internet()
    mqtt_client = MqttClient()
    ntp_datetime = NTPDatetime()
    weather = Weather()
    pomodoro = Pomodoro()

    # 协程函数定义
    internet_task = asyncio.create_task(wifi_connect(internet))
    mqtt_task = asyncio.create_task(mqtt_connect(mqtt_client))
    fetch_time_task = asyncio.create_task(fetch_time(internet, ntp_datetime))
    fetch_weather_task = asyncio.create_task(fetch_weather(internet, weather))
    pixels_led_task = asyncio.create_task(pixels_led(internet, pomodoro))
    lcd_display_task = asyncio.create_task(
        lcd_display(internet, mqtt_client, ntp_datetime, weather, pomodoro))
    monitor_touch_buttons_task = asyncio.create_task(
        monitor_touch_buttons(pomodoro))

    # 启动协程
    await asyncio.gather(internet_task, mqtt_task, fetch_time_task, fetch_weather_task, pixels_led_task, lcd_display_task, monitor_touch_buttons_task)


asyncio.run(main())

欢迎大家一起学习交流

 

 

本帖最后由 ltpop 于 2023-11-27 12:02 编辑
点赞  2023-11-26 23:39
电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 京公网安备 11010802033920号
    写回复