preface

For no reason, I just thought there were so many girls on Instagram…

The body of the

A, analysis,

1. Analyze target websites

First analysis of the website picture loading process, Taeri__taeri should be known this network celebrity. Instagram photos only load a certain number of photos at a time, scroll down and load again, no doubt looking at XHR

You can see the JSON data in the preview bar, and the display_url is the link to the photo, and that’s all you need to get

2. Analyze the request parameters

Go back to headers and see what parameters the request took; Just two, quer_hash and variables

Variables is a JSON that contains ID, first, and after. For the sake of no trouble. I’ll just tell you what these three are, if you’re interested

Id: User ID indicates the ID of a user

First: This request loads the number of photos

After: End cursor this parameter is used to determine the previous page. Without this parameter, the first page will always be loaded. This page will have an end cursor parameter for the next page request

One more thing, I need to add cookies, so I don’t have to say much about this

3. Procedure flow

Now that I’ve analyzed all the requests, I’m going to analyze how to write the program. What’s my requirement now

Given a user name, grab all of that user’s photos and download them to instagram// as soon as possible

Requirements said need as soon as possible, then single thread even, too inefficient, a common network red photos at least there are hundreds of; So asyncio is used here, and the flow looks like this

The first task is to download the image link and then to get the image link. While I’m waiting to get the image link, I’ll do the download task first. While WAITING to get the image link, I’ll get the download link.

Second, the code

Lib:

import json
import multiprocessing
import sys
from urllib.parse import urljoin

import aiohttp
import asyncio
import os
import re

from pathlib import Path

import requests
Copy the code

__init__:

def __init__(self, username, maxtasks=200):
    self.username = username
    self.maxtasks = maxtasks  # Maximum number of tasks
    self.queue = asyncio.Queue(maxsize=maxtasks * 2)
    # configure proxy, no scientific access to Instagram
    os.environ['http_proxy'] = PROXY
    os.environ['https_proxy'] = PROXY
    self.session = aiohttp.ClientSession(trust_env=True, headers=HEADERS)
Copy the code

Get the user ID first:

async def get_shared_data(self):
    """ Get shared data :return: """
    try:
        async with self.session.get(ROOT_URL + self.username) as resp:
            html = await resp.text()
            if html is not None and '_sharedData' in html:
                shared_data = html.split("window._sharedData = ") [1].split(
                    "; ") [0]
                if not shared_data: No shared data can be used to terminate the program
                    print('!!!!!!!!!!!!!!!! ')
                    exit(1)
                return json.loads(shared_data)
    except Exception:
        pass

async def init(self):
    """ Initialize the necessary parameters :return: """
    user = (await self.get_shared_data())['entry_data'] ['ProfilePage'] [0] ['graphql'] ['user']
    if not user:
        print('user is none.')
        exit(1)
    self.user_id = user['id']  # user id
    self.count = user['edge_owner_to_timeline_media'] ['count']  # Number of photos
Copy the code

Producers:

async def produce_download_urls(self, max=50):
    """ Get links to all photos on each page :param Max: Get number of photos at once :return: """
    end_cursor = ' ' Load the first page
    while True:
        pic_params = {
            'query_hash':
            'f2405b236d85e8296cf30347c9f08c2a'.# query_hash can fix a value
            'variables':
            '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(
                self.user_id, max, end_cursor),
        }
        pic_url = ROOT_URL + 'graphql/query/'
        async with self.session.get(pic_url, params=pic_params) as resp:
            json = await resp.json()
            edge_media = json['data'] ['user'] ['edge_owner_to_timeline_media']
            edges = edge_media['edges']
            if edges:
                for edge in edges:
                    await self.queue.put(edge['node'] ['display_url'])  # queue communication
                    has_next_page = edge_media['page_info'] ['has_next_page'] # json has a has Next page item with a value of true or false to determine whether there is a next page
                    if has_next_page:
                        end_cursor = edge_media['page_info'] ['end_cursor'] Get end cursor
                        else:
                            break
Copy the code

Consumer:

async def download(self):
    """ Download photo :return: """
    while not (self.producer.done() and self.queue.empty()): # if the production task is not completed and the queue is not empty
        url = await self.queue.get()  # Get the photo link
        filename = PATH / url.split('? ') [0].split('/') [- 1]
        async with self.session.get(url) as resp:
            with filename.open('wb') as f:
                async for chunk in resp.content.iter_any():
                    f.write(chunk)
        self.queue.task_done() The url of the photo that was queued has been downloaded.
        print('. ', end=' ', flush=True)
Copy the code

Run:

async def run(self):
    """ :return: """
    print('Preparing... ')
    print('Initializing... ')
    await self.init()
    print('User id: %r.' % self.user_id)
    print('Total %r photos.' % self.count)
    print(The '-'*50)
    self.producer = asyncio.create_task(self.produce_download_urls())
    print('Downloading... ', end=' ', flush=True)
    await asyncio.gather(*(self.download() for _ in range(self.maxtasks))) # Asyncio. gather is similar to Asyncio. wait
Copy the code

Check:

def check(_):
    """ Detect photo count... Too vegetables don't know how to stop can only so (escape; "" "
    print('Start check... ')
    with requests.get(urljoin(ROOT_URL, USERNAME), headers=HEADERS,
                 proxies={'http': 'http://localhost:80001'.'https': 'https://localhost:8001'}) as resp:
        pattern = '"edge_owner_to_timeline_media":.? {"count":(.*?) ,"page_info"'
        count = int(re.findall(pattern, resp.text)[0])
        while True:
            files = len(os.listdir(PATH))
            print('Check files:%r' % files)
            if files == count:
                # print('Total %r photos download done.' % count)
                print('\nProduce done, Total %r photos, plz wait save done :)' % count)
                sys.exit(0)
Copy the code

Main:

async def main(a):
    ins = Instagram(USERNAME)
    try:
        await ins.run()
    finally:
        await ins.close()
Copy the code

if __name__ == ‘__main__’:

if __name__ == '__main__':
    try:
        p = multiprocessing.Process(target=check, args=(0,))
        p.start()
        future = asyncio.ensure_future(main())
        loop = asyncio.get_event_loop()
        loop.run_until_complete(future)
        loop.close()
    except KeyboardInterrupt:
        pass
Copy the code

Run:

The last

Project address 🙂