import requests import logging from flask import Flask, request, jsonify from urllib.parse import quote import hashlib import redis import random import string import time app = Flask(__name__) logging.basicConfig(level=logging.INFO) VOICEGLOW_API_KEY = “REDACTED” VOICEGLOW_API_URL = "https://na-vg-edge.moeaymandev.workers.dev" ERPNext_API_KEY = "REDACTED" ERPNext_SECRET = "REDACTED" # Initialize Redis client redis_client = redis.StrictRedis(host='localhost', port=6379, db=0) LOCK_EXPIRE_TIME = 60 # Time in seconds for lock expiration PROCESS_EXPIRE_TIME = 120 # Time in seconds for processing status expiration MAX_RETRY_ATTEMPTS = 5 # Max attempts to retry updating ERPNext user def create_voiceglow_agent(agent_id, agent_name): payload = { "agentPlatform": "vg", "title": agent_name, "description": "This is an Airo agent", "language": "en", "isActive": True, "theme": "custom-light", "fontFamily": "Inter", "scrollAnimation": True, "roundedImageURL": "https://file.toolbase.ai/airo/assets/airo.round.png", "branding": "✨ Powered by Airo.Chat:airo.chat", "proactiveMessage": "👋 Hi, how can Airo help you today?", "messageDelayMS": 1000, "maxTokensUsage": 100000, "autoStartWidget": False, "recordChatHistory": True, "UIhandoffTitle": "Hi there!", "UIhandoffSubtitle": "We reply within minutes.", "vg_defaultModel": "gpt-4o", "vg_enableUIEngine": False, "vg_temperature": 0.5, "vg_systemPrompt": "You're a virtual assistant working for a company with the following relevant information: \n {about_context} \n\n Your main role is to be helpful and engaging. > "vg_initMessages": ["Hi, I'm Airo, your virtual assistant, how can I help you today?", "Hello, I'm Airo, how can I assist you today?"], "chatForget": False, "enableAudioSupport": False, "enableAITranslate": False, "alwaysShowHandoff": False, "showHandoffEvenIfOffline": False, "fixedHandoffPopup": True, "enableQuickFileUpload": False, "customThemeJSONString": "{\"themeType\":\"light\",\"primary\":\"#EF6820\"}" } logging.info(f"Payload for creating Voiceglow Agent: {payload}") response = requests.post( f"{VOICEGLOW_API_URL}/v2/agents/{agent_id}", headers={ "Authorization": f"Bearer {VOICEGLOW_API_KEY}", "Content-Type": "application/json" }, json=payload ) logging.info(f"Create Agent response: {response.status_code}, {response.text}") return response def create_voiceglow_org_client(agent_id, name): payload = { "name": f"{name}", "preferredLanguage": "en", "widgetIDs": [agent_id], "adminIDs": [], "canSelfEdit": False, "disallowAnyTags": False, "dashboardLayout": "horizontal", "squarePhotoURL": "https://file.toolbase.ai/airo/assets/airo.square.png" } logging.info(f"Payload for creating Org Client: {payload}") response = requests.put( f"{VOICEGLOW_API_URL}/v2/orgs", headers={ "Authorization": f"Bearer {VOICEGLOW_API_KEY}", "Content-Type": "application/json" }, json=payload ) logging.info(f"Create Org Client response: {response.status_code}, {response.text}") if response.status_code == 200: org_id = response.json()['data']['ID'] return org_id return None def add_user_to_voiceglow_org_client(org_id, name, email, password): payload = { "orgId": org_id, "name": name, "squarePhotoURL": "https://file.toolbase.ai/airo/assets/airo.square.png", "email": email, "dashboardPassword": password, "canAccess": ["/home", "/convos", "/analytics", "/kb", "/settings"], "isOrgAdmin": False, "notificationsSettings": { "notifyThrough": "all", "notifyIf": "all" } } logging.info(f"Payload for adding user to Org Client: {payload}") response = requests.post( f"{VOICEGLOW_API_URL}/v2/clients/{org_id}", headers={ "Authorization": f"Bearer {VOICEGLOW_API_KEY}", "Content-Type": "application/json" }, json=payload ) logging.info(f"Add User to Org Client response: {response.status_code}, {response.text}") return response def generate_secure_password(email, creation_time): return hashlib.sha256(f"{email}{creation_time}".encode()).hexdigest() def generate_agent_id(): return f"airo_{''.join(random.choices(string.ascii_letters + string.digits, k=9))}" def update_erpnext_user_with_custom_url(email, password, agent_id): erpnext_url = f"https://app.airo.chat/api/resource/User/{email}" headers = { "Authorization": f"token {ERPNext_API_KEY}:{ERPNext_SECRET}", "Content-Type": "application/json" } custom_url = f"https://client.airo.chat/app/na/client?username={quote(email)}&password={password}" payload = { "data": { "custom_url": custom_url, "custom_username": email, "custom_password": password, "custom_agentid": agent_id } } logging.info(f"Updating ERPNext user with payload: {payload}") attempts = 0 while attempts < MAX_RETRY_ATTEMPTS: try: response = requests.put(erpnext_url, headers=headers, json=payload) logging.info(f"Update ERPNext user response: {response.status_code}, {response.text}") response.raise_for_status() return except requests.exceptions.HTTPError as errh: if response.status_code == 417: logging.warning(f"HTTP Error 417: TimestampMismatchError, retrying... Attempt {attempts + 1}/{MAX_RETRY_ATTEMPTS}") try: response = requests.get(erpnext_url, headers=headers) response.raise_for_status() latest_doc = response.json() logging.info(f"Fetched latest ERPNext user document: {latest_doc}") # Update the payload with the latest timestamp and any other necessary fields latest_timestamp = latest_doc['data']['modified'] payload['data']['modified'] = latest_timestamp except requests.exceptions.HTTPError as fetch_errh: logging.error(f"HTTP Error while fetching latest document: {fetch_errh}") return except requests.exceptions.RequestException as fetch_err: logging.error(f"Error while fetching latest document: {fetch_err}") return attempts += 1 time.sleep(1) # Adding a delay between retries else: logging.error(f"HTTP Error: {errh}") return except requests.exceptions.RequestException as err: logging.error(f"Error: {err}") return @app.route('/webhook', methods=['POST']) def handle_webhook(): logging.info("Webhook received") data = request.json logging.info(f"Webhook data: {data}") email = data['email'] creation_time = data['creation'] lock_key = f"lock_{email}" if redis_client.setnx(lock_key, "locked"): redis_client.expire(lock_key, LOCK_EXPIRE_TIME) try: if redis_client.exists(email): logging.info(f"Client {email} already processed") return jsonify({"status": "client already processed"}), 200 secure_password = generate_secure_password(email, creation_time) agent_id = generate_agent_id() agent_name = email.split('@')[0].replace('+', ' ').title() if any(role['role'] == 'Client' for role in data['roles']): agent_response = create_voiceglow_agent(agent_id, agent_name) if agent_response.status_code == 200: org_id = create_voiceglow_org_client(agent_id, agent_name) if org_id: user_response = add_user_to_voiceglow_org_client(org_id, agent_name, email, secure_password) if user_response.status_code == 200: update_erpnext_user_with_custom_url(email, secure_password, agent_id) redis_client.setex(email, PROCESS_EXPIRE_TIME, "processed") logging.info(f"Client {email} marked as processed in Redis") return jsonify({"status": "success"}), 200 else: logging.error("Failed to create Voiceglow agent") return jsonify({"status": "failed to create agent"}), 500 return jsonify({"status": "ignored"}), 200 finally: redis_client.delete(lock_key) else: logging.info(f"Client {email} is currently being processed by another worker") return jsonify({"status": "client is currently being processed"}), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)