成都网站制作怎么收费,百度seo营销,wordpress 在线答题,iis 默认网站 删除同样还是需要开通钉钉应用这里就不错多说了
第一步:梳理逻辑流程 前提#xff1a;打卡的机器是使用postgres数据库#xff0c;由于因为某些原因#xff0c;钉钉userId 我已经提前获取到了存放到数据库里。 1.用户打卡成功后#xff0c;我们应该监听数据库进行查询#xf…同样还是需要开通钉钉应用这里就不错多说了
第一步:梳理逻辑流程 前提打卡的机器是使用postgres数据库由于因为某些原因钉钉userId 我已经提前获取到了存放到数据库里。 1.用户打卡成功后我们应该监听数据库进行查询然后获取到打卡的时间在通过钉钉的工作消息接口发送消息给当前考勤打卡的用户这样用户就可以知道我上班的打卡时间
现在我们看看应该怎么去实现
先定义钉钉接口先Token
appkey
appsecret
ding_url https://oapi.dingtalk.com/async def dingTalkToken():async with httpx.AsyncClient() as client:response await client.get(ding_url gettoken, params{appkey: appkey, appsecret: appsecret})# 解析响应JSONresult response.json()# 提取Access Tokenaccess_token result.get(access_token)return access_token
钉钉工作消息接口 async def dingTalkTokenAsyncsend_v2(access_token, kqTime, userid):params {agent_id: ,msg: {msgtype: text,text: {content: 打卡成功 kqTime}},userid_list: userid}async with httpx.AsyncClient() as client:response await client.post(ding_url topapi/message/corpconversation/asyncsend_v2?access_token access_token, paramsparams)# 解析响应JSONresult response.json()# 提取Access Tokenerrcode result.get(errcode)return errcode
以上向钉钉工作发送消息的接口已经完成了
接下来就是核心代码
async def check_notifications():print(执行)try:# 连接到数据库with psycopg2.connect(dbnameposql.dbname, userposql.user, passwordposql.password, hostposql.host,portposql.port) as connection:# 创建一个游标对象用于执行 SQL 语句with connection.cursor() as cursor:# 执行查询cursor.execute(LISTEN punch_event_channel)while True:connection.commit() # 提交事务await asyncio.sleep(1)# 检查是否有通知connection.poll() # 从服务器获取通知if connection.notifies:notify connection.notifies[0]# 执行查询cursor.execute(这填写打卡系统的数据库使用ID去查询最新
SELECT * FROM table WHERE id %s % int(notify.payload))# 获取查询结果result cursor.fetchone()columns [desc[0] for desc in cursor.description]result_dict dict(zip(columns, result))# 处理 datetime 对象的序列化result_dict[timestamp] result_dict.get(timestamp, None)if result_dict[timestamp]:result_dict[timestamp] datetime.fromisoformat(result_dict[timestamp])kqTime result_dict[att_date] result_dict[att_time]# 获取用户IDuser_name result_dict[person_name]sql_str EXEC GetUserDingByName UserName N%s; % user_nameuser_id query_user_info(sql_str)if user_id ! null:# 发送HTTP POST请求获取Access Tokenaccess_token await dingTalkToken()codes await dingTalkTokenAsyncsend_v2(access_token, kqTime, user_id)print(f考勤时间: result_dict[att_date] result_dict[att_time])breakelse:break# 去执行 钉钉推送模块# messages 发送成功else:print({notify_payload: 没有消息})await asyncio.sleep(1)except Exception as e:print({error: str(e)})
解释一下代码 # 执行查询
cursor.execute(LISTEN punch_event_channel)
这里我是在考勤机器的数据库里做了一个punch_event_channel 的频道而这个频道是我创建了一个触发函数用来触发最新数据库里的数据
接下来是创建触发函数
-- 创建触发器函数
CREATE OR REPLACE FUNCTION notify_punch_event()
RETURNS TRIGGER AS
$$
BEGINPERFORM pg_notify(punch_event_channel, new_punch_event);RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 创建触发器关联到表的 AFTER INSERT 事件上CREATE TRIGGER 触发器名称XXXXX
AFTER INSERT ON 表名
FOR EACH ROW
EXECUTE PROCEDURE notify_punch_event();
在sql 工具执行这两句就可以了替换成你自己的数据库
另外这里是我内部拿取钉钉Userid的数据库我就不放代码了
# 获取用户ID
user_name result_dict[person_name]
sql_str EXEC GetUserDingByName UserName N%s; % user_name
user_id query_user_info(sql_str)
你们可以根据自己方式来获取
最后就是使用定时任务来
async def periodic_task():while True:await check_notifications()await asyncio.sleep(1) # 1秒钟检查一次可以根据需要调整间隔if __name__ __main__:# 启动定时任务app.add_task(periodic_task())# 启动 Sanic 应用app.run(host0.0.0.0, port8089, workers8)
最后记得导入包
import psycopg2
import httpx
import pymssql # 这是sql server 数据库连接
如果写的好动动你们发财的小手点赞,对你有帮助也可以打赏请我喝杯咖啡提提神感谢各位兄弟了