Learning

asyncio

| 2 min read
  • Concurrency (io-bound tasks)
    • Do something else while waiting for the response/result of another task
  • Event loop
    • asyncio.run()
  • Coroutine object
    • async keyword
  • Awaiting the coroutine object returned by the async function will “start it”. Only “calling it” will just return the coroutine object.
  • Tasks asyncio.create_task(coroutine object) - scheduling coroutines to be run concurrently (handled by the event loop.)
  • asyncio.gather(coroutine_object_1, coroutine_object_2, ...), returns the result of the coroutines in the provided order as a list.
    • not good at error handling (if a coroutine fails, the ones after it still run, creating a weird state.)
  • Task group context manager asyncio.TaskGroup() - built-in error handling (cancels other coroutines if one fails, often preferable to gather.)
async def main():
	tasks = []
	async with asyncio.TaskGroup() as tg:
		for i, sleep_time in enumerate([2, 1, 3], start=1):
			task = tg.create_task(fetch_data(i, sleep_time))
			tasks.append(task) #coroutine object
		#All tasks have finished after we unblock the Task Group
		results = [task.result() for task in tasks]

		for result in results:
			print(result)

asyncio.run(main())

Synchronization

asyncio.Lock()

lock = asyncio.Lock()

async def modify_shared_resource():
	global shared_resource
	async with lock:
		# critical section
		shared_resource += 1
		await asyncio.sleep(1) # simulating working with the resource.

async def main():
	await asyncio.gather(*(modify_shared_resource() for _ in range(5)))

asyncio.run(main())

asyncio.Semaphore(n)

async def access_resource(semaphore, resource_id):
	async with semaphore:
		# simulating accessing a limited resource
		await asyncio.sleep(1) # simulating work with the resource
async def main():
	semaphore = asyncio.Semaphore(2) # allow 2 concurrent accesses
	await asyncio.gather(*(access_resource(semaphore, i) for i in range(5)))

asyncio.run(main())

asyncio.Event()

It can be set or wait for it to be set. Serves to block areas of code until the event is set (essentially a boolean flag)

async def waiter(event):
	print("waiting for the event to be set")
	await event.wait()
	print("event has been set, continuing execution")

async def setter (event):
	await asyncio.sleep(2) # simulate doing some work
	event.set()
	print("event has been set!")

async def main():
	event = asyncio.Event()
	await asyncio.gather(waiter(event), setter(event))

asyncio.run(main())