from __future__ import annotations
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
from .constants import *
from .log import LOG
from .models import *
from .oauth import get_access_token
from .utils import *
[docs]class ZoomAPI:
# noinspection GrazieInspection
"""
Helper client to interact with the `Zoom API v2`_
.. _Zoom API v2: https://marketplace.zoom.us/docs/api-reference/zoom-api/
"""
__slots__ = (
# required for `@cached_property`
'__dict__',
# instance attributes
'_session',
'_account_id',
'_access_token',
'_client_id',
)
def __init__(self, client_id: str,
client_secret: str,
account_id: str = None,
local=False):
self._session = session = requests.Session()
self._account_id = account_id = account_id or os.getenv('ZOOM_ACCOUNT_ID')
self._client_id = client_id
if local:
self._access_token = token = 'abc12345'
else:
self._access_token = token = get_access_token(
session, account_id, client_id, client_secret,
)
session.headers['Authorization'] = f'Bearer {token}'
session.headers['Content-Type'] = 'application/json'
[docs] @classmethod
def dummy_client(cls):
"""Create a dummy :class:`ZoomAPI` client, for testing purposes."""
return cls('...', '...', local=True)
def __repr__(self):
cls_name = self.__class__.__name__
masked_token = f'{self._access_token[:5]}***'
return (f'{cls_name}(access_token={masked_token!r}, '
f'account_id={self._account_id!r}, '
f'client_id={self._client_id!r})')
def _new_session(self) -> requests.Session:
session = requests.Session()
session.headers['Authorization'] = f'Bearer {self._access_token}'
session.headers['Content-Type'] = 'application/json'
return session
def _get(self, url: str, params: dict = None, session: requests.Session | None = None):
LOG.debug('GET %s, params=%s', url, params)
return (session or self._session).get(url, params=params)
def _post(self, url: str, data: dict = None, session: requests.Session | None = None):
LOG.debug('POST %s, data=%s', url, data)
return (session or self._session).post(url, json=data)
[docs] @log_time
def bulk_create_meetings(self, col_header_to_kwarg: dict[str, str] = None,
*,
rows: list[RowType] = None,
excel_file: str | os.PathLike[str] = None,
max_threads=10,
process_row: ProcessRow = None,
update_row: UpdateRow = None,
out_file: str | os.PathLike[str] = None,
default_timezone='UTC',
dry_run=False):
# noinspection GrazieInspection
"""``POST /users/{userId}/meetings``: Use this API to *bulk*-create meetings, given
a list of meetings to create.
If the rows containing meetings to create lives in an Excel (.xlsx) file,
then `excel_file` must be passed in, and contain the filepath of the Excel
file to retrieve the meeting details from; else, `rows` must be passed in
with a list of meetings to create.
Note that to read from Excel, the ``sheet2dict`` library is required;
this can be installed easily via::
$ pip install zoom-api-helper[excel]
``col_header_to_kwarg`` is a mapping of column headers to keyword arguments,
as accepted by the :meth:`create_meeting` method. If the header names are
all title-cased versions of the keyword arguments, then this argument does
*not* need to be passed in. See also, :attr:`constants.CREATE_MEETING_KWARGS`
for a full list of acceptable keywords for the *Create Meeting* API; note
that these are specified as *values* in the key-value pairing.
``process_row`` is an optional function or callable that will be called with
a copy of each *row*, or individual meeting info. The function can modify
the row in place as desired.
``update_row`` is an optional function or callable that will be called with
the HTTP response data from the Create Meetings API, and the associated
row from the Excel file. The function can modify the row in place as desired.
If ``dry_run`` is enabled, then no API calls will be made, and this function
will essentially be a no-op; however, useful logging output is printed for
debugging purposes. Enable this parameter along with the :func:`setup_logging`
helper function, in a debug environment.
**How It Works**
This function scans in a list of rows containing a list of meetings
to create, either from a local file or from the ``rows`` parameter.
Then, it calls ``process_row`` on each row, and also retrieves a
mapping of column name to keyword argument via ``col_header_to_kwarg``.
Then, it concurrently creates a list of meetings via the Zoom
**Create Meetings** API. Finally, it calls ``update_row`` on each
response and row pair, and writes out the updated meeting info to
an output CSV file named ``out_file``, which defaults to
``{excel-file-name-without-ext}}.out.csv`` if an output filepath
is not specified.
**References**
API documentation:
- https://marketplace.zoom.us/docs/api-reference/zoom-api/methods/#operation/meetingCreate
If a naive `datetime` object is provided for ``start_time`` or any other field
(i.e. one with no timezone information) the value for ``timezone`` will determine
which timezone to associate with the `datetime` object - defaults to *UTC* time
if not specified.
For a list of supported timezones, please see:
- https://marketplace.zoom.us/docs/api-reference/other-references/abbreviation-lists/#timezones
"""
# noinspection PyUnresolvedReferences
import sheet2dict
def to_snake_case(s: str):
return s.replace(' ', '_').replace('-', '_').lower()
if rows:
col_headers = rows[0].keys()
else:
ws = sheet2dict.Worksheet()
ws.xlsx_to_dict(excel_file)
col_headers = ws.sheet_items[0].keys()
rows = ws.sheet_items
header_to_kwarg = {kwarg: kwarg for kwarg in CREATE_MEETING_KWARGS}
if not col_header_to_kwarg:
col_header_to_kwarg = {h: to_snake_case(h) for h in col_headers}
for h in col_headers:
kwarg = col_header_to_kwarg.get(h) or to_snake_case(h)
if kwarg in CREATE_MEETING_KWARGS:
header_to_kwarg[h] = kwarg
meetings_to_create = []
if process_row is None:
def do_nothing(_row): return True
process_row = do_nothing
for row in rows:
# copy row so as not to modify it directly.
copied_row = row.copy()
# optional: process the row.
is_valid = process_row(copied_row)
if not is_valid:
continue
# retrieve meeting info.
mtg = {header_to_kwarg[h]: copied_row[h] for h in header_to_kwarg
if h in copied_row}
# add default fields.
if 'timezone' not in mtg:
mtg['timezone'] = default_timezone
meetings_to_create.append(mtg)
# if it's a dry run, print useful info for debugging purposes, then quit.
if dry_run:
print('Column Header to Keyword Argument:')
print(json.dumps(header_to_kwarg, indent=2))
print()
print(f'Have {len(meetings_to_create)} Meetings to Create:')
print(json.dumps(meetings_to_create, indent=2, cls=CustomEncoder))
return
LOG.debug('Column Header to Keyword Argument: %s', header_to_kwarg)
if out_file is None:
p = Path(excel_file)
out_file = p.with_name(f'{p.stem}.out.csv')
LOG.debug('Output File: %s', out_file.absolute())
def create_meeting(mtg_: RowType):
# use a separate session for each thread.
session = self._new_session()
return self.create_meeting(session=session, **mtg_)
if update_row is None:
update_row = dict.update
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Start the create meeting operations and mark each future with its row index
future_to_row_idx = {executor.submit(create_meeting, mtg): i
for i, mtg in enumerate(meetings_to_create)}
for future in as_completed(future_to_row_idx):
idx = future_to_row_idx[future]
row = rows[idx]
try:
resp = future.result()
except Exception as exc:
LOG.error('[%d] %r generated an exception: %s', idx, row, exc)
else:
update_row(row, resp)
write_to_csv(out_file, rows)
[docs] def create_meeting(self, *,
session: requests.Session | None = None,
host_id: str | None = None,
host_email: str | None = None,
topic: str = 'My Meeting',
agenda: str = 'My Description',
start_time: datetime | str | None = None,
timezone: str | None = 'UTC',
type: Meeting | None = None,
**request_data) -> dict[str, Any]:
"""``POST /users/{userId}/meetings``: Use this API to create a meeting for a user.
Ref:
- https://marketplace.zoom.us/docs/api-reference/zoom-api/methods/#operation/meetingCreate
If a naive `datetime` object is provided for `start_time` or any other field
(i.e. one with no timezone information) the value for`timezone` will determine
which timezone to associate with the `datetime` object - defaults to *UTC* time
if not specified.
For a list of supported timezones, please see:
- https://marketplace.zoom.us/docs/api-reference/other-references/abbreviation-lists/#timezones
:return:
"""
if host_id:
user_id = host_id
elif host_email:
user_id = self._user_email_to_id_cached[host_email]
else:
user_id = 'me'
if agenda:
request_data['agenda'] = agenda
if topic:
request_data['topic'] = topic
if start_time:
request_data['start_time'] = start_time.isoformat() \
if isinstance(start_time, datetime) else start_time
if timezone:
request_data['timezone'] = timezone
if type:
request_data['type'] = type.value
r = self._post(f'https://api.zoom.us/v2/users/{user_id}/meetings', request_data, session)
r.raise_for_status()
return r.json()
@cached_property
def _user_email_to_id_cached(self):
return self.user_email_to_id(use_cache=True)
[docs] def user_email_to_id(self, status: str | None = 'active', *, use_cache=False):
filename = CACHE_DIR / f'users_{self._account_id}_{self._client_id}.json'
if use_cache:
users = read_json_file_if_exists(filename)
if users:
return users
users = self.list_users(status)['users']
email_to_id = {u['email']: u['id'] for u in users}
# save list of users to cache
save_json_file(filename, email_to_id)
return email_to_id
[docs] def list_users(self, status: str | None = 'active', page_size=300, page=1, all_pages=True):
# noinspection GrazieInspection
"""``GET /users``: Use this API to list your account's users.
Ref:
- https://marketplace.zoom.us/docs/api-reference/zoom-api/methods/#operation/users
:param status: The user's status, one of: active, inactive, pending
:param page_size: The number of records returned within a single API call.
:param page: The page number of the current page in the returned records.
:param all_pages: True to paginate and retrieve all records (pages).
:return: A `dict` object, where the `users` key contains a list of users.
"""
params = {'page_size': page_size, 'page_number': page}
if status:
params['status'] = status
res = self._get(API_USERS, params=params)
final_data = data = res.json()
if all_pages:
while data['page_count'] > data['page_number']:
params['page_number'] += 1
res = self._get(API_USERS, params=params)
data = res.json()
final_data['users'].extend(data['users'])
return final_data